自定义topic上报数据报错

Blade 未结 2 410
hellow111
hellow111 剑侠 2025-01-02 15:55

一、该问题的重现步骤是什么?

1.  按照教程构建新的topic

2.  在test 处订阅发布消息

3.  新写的

CustomHoistPostFunction

方法报错

2025-01-02 15:47:04.380  INFO 27784 --- [   biz-worker-9] o.s.m.b.d.f.c.CustomHoistPostFunction    : 自定义topic上报数据:{currentLevel=2, targetLevel=5}

2025-01-02 15:47:04.380 ERROR 27784 --- [   biz-worker-9] o.s.m.b.d.f.c.CustomHoistPostFunction    : req id:1e42776050ee34856173a1f8b51e6dc0 property post error


java.lang.NullPointerException: Cannot invoke "org.springblade.iot.core.pojo.BaseDevice.getProductKey()" because "deviceInfo" is null

at org.springblade.iot.tsdb.tdengine.impl.DeviceDataServiceImpl.saveDeviceProperty(DeviceDataServiceImpl.java:31)

at org.springblade.mqtt.broker.data.function.custom.CustomHoistPostFunction.execute(CustomHoistPostFunction.java:63)

at org.springblade.mqtt.broker.data.function.custom.CustomHoistPostFunction.execute(CustomHoistPostFunction.java:27)

at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.execDeviceFunction(BladeMqttFunctionMessageListener.java:104)

at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.execMqttFunction(BladeMqttFunctionMessageListener.java:79)

at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.lambda$onMessage$0(BladeMqttFunctionMessageListener.java:69)

at org.springblade.core.tenant.TenantUtil.use(TenantUtil.java:93)

at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.onMessage(BladeMqttFunctionMessageListener.java:68)

at net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor.lambda$invokeListenerForPublish$4(DefaultMqttServerProcessor.java:503)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)

at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)

at java.base/java.lang.Thread.run(Thread.java:840)


2条回答
  • 2025-01-02 16:01

    产品物模型先执行一下发布操作,然后到这里用设备模拟器模拟上报,看看是否能成功:https://iot.bladex.cn/feature/device/simulator/simulator.html


    如果模拟器能成功,你自己写的不行,就是你代码的问题了,需要详细列一下你怎么写的

    0 讨论(0)
  • 2025-01-02 16:55

    我是按照教程写的,附加代码:

    org.springblade.mqtt.broker.mqtt;
    
    lombok.extern.slf4j.;
    
    MqttClientTest {
    
        (String[] args) {
           String productKey = ;
           productKey = ;
    
           DeviceSimulator simulator4 = DeviceSimulator(productKey, , );
    
           simulator4.start();
        }
    
    }


    org.springblade.mqtt.broker.mqtt;
    
    lombok.;
    lombok.;
    lombok.extern.slf4j.;
    net.dreamlu.iot.mqtt.core.client.MqttClient;
    net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
    org.springblade.base.client.ClientIdInfo;
    org.springblade.base.client.ClientSign;
    org.springblade.core.tool.support.Kv;
    org.springblade.core.tool.utils.StringUtil;
    org.springblade.core.tool.utils.TemplateUtil;
    org.springblade.mqtt.broker.data.protocol.core.DataReq;
    org.springblade.mqtt.broker.data.protocol.ntp.NtpReq;
    org.tio.utils.buffer.ByteBufferUtil;
    org.tio.utils.json.JsonUtil;
    
    java.util.HashMap;
    java.util.Map;
    java.util.function.Consumer;
    
    DeviceSimulator {
        String ;
        String ;
        String ;
        ;
        = ;
    
        (String productKey, String deviceName, String deviceSecret) {
           (productKey, deviceName, deviceSecret, );
        }
    
        () {
           start(mqttClientCreator -> {});
        }
    
        (Consumer<MqttClientCreator> consumer) {
           ClientSign clientSign = createClientSign();
           String username = clientSign.getUsername();
           String password = clientSign.getPassword();
           String clientId = clientSign.getClientIdInfo().getMqttClientId();
    
           .info(, username, password, clientId);
    
           MqttClientCreator creator = MqttClient.()
              .username(username)
              .password(password)
              .clientId(clientId);
           consumer.accept(creator);
           MqttClient client = creator.connectSync();
    
           () {
              subscribeTopics(client);
              scheduleTasks(client);
           }
        }
    
        ClientSign () {
           ClientIdInfo clientIdInfo = ClientIdInfo();
           clientIdInfo.setClientId();
    
           ClientSign clientSign = ClientSign();
           clientSign.setClientIdInfo(clientIdInfo);
           clientSign.setProductKey();
           clientSign.setDeviceName();
           clientSign.setDeviceSecret();
           clientSign;
        }
    
        (MqttClient client) {
    client.subQos0(+ + + + , (context, topic, message, payload) -> {
              .info(, topic, ByteBufferUtil.(payload));
           });
        }
    
        (MqttClient client) {
           publishTask(client, , createHoistData());
        }
    
        (MqttClient client, String endpoint, Object data) {
           String topic = TemplateUtil.(endpoint, Kv.().set(, ).set(, ));
           client.schedule(() -> .publish(, JsonUtil.()), );
        }
    
        DataReq<Map<String, Object>> () {
           DataReq<Map<String, Object>> req = DataReq<>();
           req.setId(StringUtil.());
           req.setVersion();
           Map<String, Object> params = HashMap<>();
           params.put(, );
           params.put(, );
           req.setParams(params);
           req;
        }
    }


    org.springblade.mqtt.broker.data.function.custom;
    
    com.fasterxml.jackson.databind.JavaType;
    lombok.extern.slf4j.;
    org.springblade.core.tool.jackson.JsonUtil;
    org.springblade.iot.tsdb.api.ITsDBDeviceDataService;
    org.springblade.mqtt.broker.data.function.core.;
    org.springblade.mqtt.broker.data.function.core.FuncMethods;
    org.springblade.mqtt.broker.data.function.core.IDeviceFunction;
    org.springblade.mqtt.broker.data.protocol.code.ApiCode;
    org.springblade.mqtt.broker.data.protocol.core.DataReq;
    org.springblade.mqtt.broker.data.service.IMqttSender;
    org.springframework.beans.factory.annotation.;
    org.tio.core.ChannelContext;
    
    java.util.Map;
    
    (
        topic = ,
        method = FuncMethods.)
    CustomHoistPostFunction IDeviceFunction<DataReq<Map<String, Object>>> {
    
        IMqttSender ;
    
        ITsDBDeviceDataService ;
    
        JavaType () {
           JsonUtil.(DataReq., JsonUtil.(Object.));
        }
    
        IMqttSender () {
           ;
        }
    
        (ChannelContext context, String topic, Map<String, String> topicVariables, DataReq<Map<String, Object>> req) {
           String replyTopic = topic + ;
           Map<String, Object> params = req.getParams();
           (params == || params.isEmpty()) {
              .error(, req.getId());
              fail(topicVariables, replyTopic, req, ApiCode.);
              ;
           }
           {
              .info(, params);
              .saveDeviceProperty(topicVariables, params);
              success(topicVariables, replyTopic, req);
           } (Throwable throwable) {
              .error(, req.getId(), throwable);
              fail(topicVariables, replyTopic, req, ApiCode.);
           }
        }
    }

    出错是在入时序库这里

    deviceDataService.saveDeviceProperty(topicVariables, params);

    这一句报错

    作者追问:2025-01-02 16:56

    产品物模型先执行一下发布操作,然后到这里用设备模拟器模拟上报,看看是否能成功:https://iot.bladex.cn/feature/device/simulator/simulator.html

    回答: 2025-01-02 17:03

    设备模拟器是可以的啊,但走的是不同的topic

    topic:/blade/sys/e8qyh6l6izuq/v65kQlrmXZYD41Ec/thing/event/property/post

    作者追问:2025-01-02 17:09

    用默认的代码,测试数据库里默认的数据,看看是否能上报。如果能上报,就在MqttClientTest和DeviceSimulator这两个类里修改成你的属性和字段来进行测试上报。如果默认的成功了,你修改完还报错,就把这两个类的代码放到txt里,发到帖子里给我们看下

    CleanShot20250102170511@2x.png

    回答: 2025-01-02 17:36

    1735810524745.png

    改成默认的以后就这样了。。。我看了好久也不知道为啥,那边也没有接收到消息

    作者追问:2025-01-02 17:38

    mqtt服务器没连接成功,检查ip和端口是否能连通,如果mqtt不是本地启动的,检查端口是否开放,网络是否能通。


    CleanShot20250102173704@2x.png

    回答: 2025-01-02 17:48

    1735811194828.png

    作者追问:2025-01-02 18:04

    你录个视频吧,看看所有的配置,以及具体的报错。不然你只截图一个局部的错误,完整日志看不到,很难帮忙判断。


    再一个就是我们让你去操作的步骤,是为了排除其他可能性。你操作完后要给我们一个回复,这样才能让我们排除其他因素帮你缩小问题范围。


    现在其他的代码都不要去动,就用现成的代码,和sql执行后数据库里现成的数据来测试上报。

    录制完毕后视频发我们邮箱:bladejava@qq.com


    提问技巧推荐看一下,这样可以更好地提供解决问题所需的信息:https://sns.bladex.cn/article-14934.html


    回答: 2025-01-02 18:12

    已发送邮件,请尽快回复,谢谢

    作者追问:2025-01-02 18:14

    仔细看我们说的提议,请先用系统的原版代码原版数据跑通了你再去自己改造。都不知道你在原版测试类是否跑通的情况,怎么来帮你缩小问题范围

    CleanShot20250102181354@2x.png

    0 讨论(0)
提交回复