一、该问题的重现步骤是什么?
1. 按照教程构建新的topic
2. 在test 处订阅发布消息
3. 新写的
CustomHoistPostFunction
方法报错
2025-01-02 15:47:04.380 INFO 27784 --- [ biz-worker-9] o.s.m.b.d.f.c.CustomHoistPostFunction : 自定义topic上报数据:{currentLevel=2, targetLevel=5}
2025-01-02 15:47:04.380 ERROR 27784 --- [ biz-worker-9] o.s.m.b.d.f.c.CustomHoistPostFunction : req id:1e42776050ee34856173a1f8b51e6dc0 property post error
java.lang.NullPointerException: Cannot invoke "org.springblade.iot.core.pojo.BaseDevice.getProductKey()" because "deviceInfo" is null
at org.springblade.iot.tsdb.tdengine.impl.DeviceDataServiceImpl.saveDeviceProperty(DeviceDataServiceImpl.java:31)
at org.springblade.mqtt.broker.data.function.custom.CustomHoistPostFunction.execute(CustomHoistPostFunction.java:63)
at org.springblade.mqtt.broker.data.function.custom.CustomHoistPostFunction.execute(CustomHoistPostFunction.java:27)
at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.execDeviceFunction(BladeMqttFunctionMessageListener.java:104)
at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.execMqttFunction(BladeMqttFunctionMessageListener.java:79)
at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.lambda$onMessage$0(BladeMqttFunctionMessageListener.java:69)
at org.springblade.core.tenant.TenantUtil.use(TenantUtil.java:93)
at org.springblade.mqtt.broker.data.listener.BladeMqttFunctionMessageListener.onMessage(BladeMqttFunctionMessageListener.java:68)
at net.dreamlu.iot.mqtt.core.server.support.DefaultMqttServerProcessor.lambda$invokeListenerForPublish$4(DefaultMqttServerProcessor.java:503)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
我是按照教程写的,附加代码:
org.springblade.mqtt.broker.mqtt;
lombok.extern.slf4j.;
MqttClientTest {
(String[] args) {
String productKey = ;
productKey = ;
DeviceSimulator simulator4 = DeviceSimulator(productKey, , );
simulator4.start();
}
}org.springblade.mqtt.broker.mqtt;
lombok.;
lombok.;
lombok.extern.slf4j.;
net.dreamlu.iot.mqtt.core.client.MqttClient;
net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
org.springblade.base.client.ClientIdInfo;
org.springblade.base.client.ClientSign;
org.springblade.core.tool.support.Kv;
org.springblade.core.tool.utils.StringUtil;
org.springblade.core.tool.utils.TemplateUtil;
org.springblade.mqtt.broker.data.protocol.core.DataReq;
org.springblade.mqtt.broker.data.protocol.ntp.NtpReq;
org.tio.utils.buffer.ByteBufferUtil;
org.tio.utils.json.JsonUtil;
java.util.HashMap;
java.util.Map;
java.util.function.Consumer;
DeviceSimulator {
String ;
String ;
String ;
;
= ;
(String productKey, String deviceName, String deviceSecret) {
(productKey, deviceName, deviceSecret, );
}
() {
start(mqttClientCreator -> {});
}
(Consumer consumer) {
ClientSign clientSign = createClientSign();
String username = clientSign.getUsername();
String password = clientSign.getPassword();
String clientId = clientSign.getClientIdInfo().getMqttClientId();
.info(, username, password, clientId);
MqttClientCreator creator = MqttClient.()
.username(username)
.password(password)
.clientId(clientId);
consumer.accept(creator);
MqttClient client = creator.connectSync();
() {
subscribeTopics(client);
scheduleTasks(client);
}
}
ClientSign () {
ClientIdInfo clientIdInfo = ClientIdInfo();
clientIdInfo.setClientId();
ClientSign clientSign = ClientSign();
clientSign.setClientIdInfo(clientIdInfo);
clientSign.setProductKey();
clientSign.setDeviceName();
clientSign.setDeviceSecret();
clientSign;
}
(MqttClient client) {
client.subQos0(+ + + + , (context, topic, message, payload) -> {
.info(, topic, ByteBufferUtil.(payload));
});
}
(MqttClient client) {
publishTask(client, , createHoistData());
}
(MqttClient client, String endpoint, Object data) {
String topic = TemplateUtil.(endpoint, Kv.().set(, ).set(, ));
client.schedule(() -> .publish(, JsonUtil.()), );
}
DataReq org.springblade.mqtt.broker.data.function.custom; com.fasterxml.jackson.databind.JavaType; lombok.extern.slf4j.; org.springblade.core.tool.jackson.JsonUtil; org.springblade.iot.tsdb.api.ITsDBDeviceDataService; org.springblade.mqtt.broker.data.function.core.; org.springblade.mqtt.broker.data.function.core.FuncMethods; org.springblade.mqtt.broker.data.function.core.IDeviceFunction; org.springblade.mqtt.broker.data.protocol.code.ApiCode; org.springblade.mqtt.broker.data.protocol.core.DataReq; org.springblade.mqtt.broker.data.service.IMqttSender; org.springframework.beans.factory.annotation.; org.tio.core.ChannelContext; java.util.Map; ( topic = , method = FuncMethods.) CustomHoistPostFunction IDeviceFunction>> { IMqttSender ; ITsDBDeviceDataService ; JavaType () { JsonUtil.(DataReq., JsonUtil.(Object.)); } IMqttSender () { ; } (ChannelContext context, String topic, Map topicVariables, DataReq > req) { String replyTopic = topic + ; Map params = req.getParams(); (params == || params.isEmpty()) { .error(, req.getId()); fail(topicVariables, replyTopic, req, ApiCode.); ; } { .info(, params); .saveDeviceProperty(topicVariables, params); success(topicVariables, replyTopic, req); } (Throwable throwable) { .error(, req.getId(), throwable); fail(topicVariables, replyTopic, req, ApiCode.); } } }
出错是在入时序库这里
deviceDataService.saveDeviceProperty(topicVariables, params);
这一句报错
产品物模型先执行一下发布操作,然后到这里用设备模拟器模拟上报,看看是否能成功:https://iot.bladex.cn/feature/device/simulator/simulator.html
设备模拟器是可以的啊,但走的是不同的topic
topic:/blade/sys/e8qyh6l6izuq/v65kQlrmXZYD41Ec/thing/event/property/post
用默认的代码,测试数据库里默认的数据,看看是否能上报。如果能上报,就在MqttClientTest和DeviceSimulator这两个类里修改成你的属性和字段来进行测试上报。如果默认的成功了,你修改完还报错,就把这两个类的代码放到txt里,发到帖子里给我们看下


改成默认的以后就这样了。。。我看了好久也不知道为啥,那边也没有接收到消息
mqtt服务器没连接成功,检查ip和端口是否能连通,如果mqtt不是本地启动的,检查端口是否开放,网络是否能通。


你录个视频吧,看看所有的配置,以及具体的报错。不然你只截图一个局部的错误,完整日志看不到,很难帮忙判断。
再一个就是我们让你去操作的步骤,是为了排除其他可能性。你操作完后要给我们一个回复,这样才能让我们排除其他因素帮你缩小问题范围。
现在其他的代码都不要去动,就用现成的代码,和sql执行后数据库里现成的数据来测试上报。
录制完毕后视频发我们邮箱:bladejava@qq.com
提问技巧推荐看一下,这样可以更好地提供解决问题所需的信息:https://sns.bladex.cn/article-14934.html
已发送邮件,请尽快回复,谢谢
仔细看我们说的提议,请先用系统的原版代码原版数据跑通了你再去自己改造。都不知道你在原版测试类是否跑通的情况,怎么来帮你缩小问题范围

扫一扫访问 Blade技术社区 移动端