review:【IoT 物联网】MqTT 协议

This commit is contained in:
YunaiV
2025-06-14 22:29:58 +08:00
parent 05ac902dc9
commit 1328d91987
3 changed files with 38 additions and 50 deletions

View File

@@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler;
@@ -67,9 +66,11 @@ public class IotEmqxUpstreamProtocol {
stop(); stop();
// 异步关闭应用,避免阻塞当前线程 // 异步关闭应用,避免阻塞当前线程
// TODO @haohao是不是阻塞也没关系哈
new Thread(() -> { new Thread(() -> {
try { try {
Thread.sleep(1000); // 等待1秒让日志输出完成 // TODO @haohao可以考虑用 ThreadUtil.sleep 更简洁
Thread.sleep(1000); // 等待 1 秒让日志输出完成
log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -110,13 +111,10 @@ public class IotEmqxUpstreamProtocol {
*/ */
private void startMqttClient() { private void startMqttClient() {
try { try {
// 2.1. 初始化消息处理器 // 1. 初始化消息处理器
this.upstreamHandler = new IotEmqxUpstreamHandler(this); this.upstreamHandler = new IotEmqxUpstreamHandler(this);
// 2.2. 创建 MQTT 客户端 // 2. 创建 MQTT 客户端,连接 MQTT Broker
createMqttClient();
// 2.3. 连接 MQTT Broker同步等待首次连接结果
boolean connected = connectMqttSync(); boolean connected = connectMqttSync();
if (!connected) { if (!connected) {
throw new RuntimeException("首次连接 MQTT Broker 失败"); throw new RuntimeException("首次连接 MQTT Broker 失败");
@@ -134,11 +132,14 @@ public class IotEmqxUpstreamProtocol {
* @param isSync 是否同步等待连接结果 * @param isSync 是否同步等待连接结果
* @return 当 isSync 为 true 时返回连接是否成功,否则返回 null * @return 当 isSync 为 true 时返回连接是否成功,否则返回 null
*/ */
// TODO @haohao是不是不用结果结束后直接判断 this.mqttClient.isConnected()
private Boolean connectMqtt(boolean isReconnect, boolean isSync) { private Boolean connectMqtt(boolean isReconnect, boolean isSync) {
// TODO @haohao这块代码是不是放到
String host = emqxProperties.getMqttHost(); String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort(); Integer port = emqxProperties.getMqttPort();
// 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端 // 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端
// TODO @hoahao疑惑为啥这里要重新创建对象呀另外创建是不是拿到 reconnectWithDelay 会更合适?这样和 startMqttClient 一样呢;
if (isReconnect) { if (isReconnect) {
createMqttClient(); createMqttClient();
} }
@@ -148,7 +149,6 @@ public class IotEmqxUpstreamProtocol {
AtomicBoolean success = isSync AtomicBoolean success = isSync
? new AtomicBoolean(false) ? new AtomicBoolean(false)
: null; : null;
mqttClient.connect(port, host, connectResult -> { mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) { if (connectResult.succeeded()) {
if (isReconnect) { if (isReconnect) {
@@ -204,21 +204,18 @@ public class IotEmqxUpstreamProtocol {
* 停止 MQTT 客户端 * 停止 MQTT 客户端
*/ */
private void stopMqttClient() { private void stopMqttClient() {
// 1.1. 取消订阅所有主题 // 1. 取消订阅所有主题
if (mqttClient != null && mqttClient.isConnected()) { if (mqttClient != null && mqttClient.isConnected()) {
List<String> topicList = emqxProperties.getMqttTopics(); List<String> topicList = emqxProperties.getMqttTopics();
if (CollUtil.isNotEmpty(topicList)) { for (String topic : topicList) {
for (String topic : topicList) { try {
try { mqttClient.unsubscribe(topic);
mqttClient.unsubscribe(topic); } catch (Exception e) {
} catch (Exception e) { log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e);
log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e);
}
} }
} }
} }
// 2. 断开 MQTT 客户端连接
// 1.2. 断开 MQTT 客户端连接
if (mqttClient != null && mqttClient.isConnected()) { if (mqttClient != null && mqttClient.isConnected()) {
try { try {
mqttClient.disconnect(); mqttClient.disconnect();
@@ -244,18 +241,18 @@ public class IotEmqxUpstreamProtocol {
* 设置 MQTT 处理器 * 设置 MQTT 处理器
*/ */
private void setupMqttHandlers() { private void setupMqttHandlers() {
// 1. 设置断开重连监听器 // 1.1 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> { mqttClient.closeHandler(closeEvent -> {
if (isRunning) { if (!isRunning) {
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); return;
reconnectWithDelay();
} }
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]");
reconnectWithDelay();
}); });
// 1.2 设置异常处理器
// 2. 设置异常处理器
mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception)); mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception));
// 3. 设置消息处理器 // 2. 设置消息处理器
mqttClient.publishHandler(upstreamHandler::handle); mqttClient.publishHandler(upstreamHandler::handle);
} }
@@ -270,16 +267,13 @@ public class IotEmqxUpstreamProtocol {
return; return;
} }
// 2. 批量订阅所有主题
Map<String, Integer> topics = new HashMap<>();
int qos = emqxProperties.getMqttQos(); int qos = emqxProperties.getMqttQos();
// 2. 构建主题-QoS 映射,批量订阅
Map<String, Integer> topicQosMap = new HashMap<>();
for (String topic : topicList) { for (String topic : topicList) {
topicQosMap.put(topic, qos); topics.put(topic, qos);
} }
mqttClient.subscribe(topics, subscribeResult -> {
// 3. 批量订阅所有主题
mqttClient.subscribe(topicQosMap, subscribeResult -> {
if (subscribeResult.succeeded()) { if (subscribeResult.succeeded()) {
log.info("[subscribeToTopics][订阅主题成功, 共 {} 个主题]", topicList.size()); log.info("[subscribeToTopics][订阅主题成功, 共 {} 个主题]", topicList.size());
} else { } else {
@@ -307,6 +301,7 @@ public class IotEmqxUpstreamProtocol {
} catch (Exception e) { } catch (Exception e) {
log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e);
} }
// TODO @haohao是不是把如果连接失败放到这里处理继续发起。。。这样connect 逻辑更简单纯粹1首次连接失败就退出2重新连接如果失败继续重试
}); });
} }

View File

@@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
* 为 EMQX 提供 HTTP 接口服务,包括: * 为 EMQX 提供 HTTP 接口服务,包括:
* 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件
* 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知
* 提供统一的错误处理和参数校验
* *
* @author 芋道源码 * @author 芋道源码
*/ */
@@ -83,7 +82,7 @@ public class IotEmqxAuthEventHandler {
} }
// 2. 执行认证 // 2. 执行认证
boolean authResult = performDeviceAuth(clientId, username, password); boolean authResult = handleDeviceAuth(clientId, username, password);
log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult); log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult);
if (authResult) { if (authResult) {
sendAuthResponse(context, RESULT_ALLOW); sendAuthResponse(context, RESULT_ALLOW);
@@ -97,8 +96,7 @@ public class IotEmqxAuthEventHandler {
} }
/** /**
* EMQX 统一事件处理接口 * EMQX 统一事件处理接口:根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件
* 根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件
* 支持的事件类型client.connected、client.disconnected 等 * 支持的事件类型client.connected、client.disconnected 等
*/ */
public void handleEvent(RoutingContext context) { public void handleEvent(RoutingContext context) {
@@ -160,18 +158,16 @@ public class IotEmqxAuthEventHandler {
* @return 请求体JSON对象解析失败时返回null * @return 请求体JSON对象解析失败时返回null
*/ */
private JsonObject parseRequestBody(RoutingContext context) { private JsonObject parseRequestBody(RoutingContext context) {
String rawBody = null;
try { try {
rawBody = context.body().asString();
JsonObject body = context.body().asJsonObject(); JsonObject body = context.body().asJsonObject();
if (body == null) { if (body == null) {
log.info("[parseRequestBody][请求体为空][rawBody={}]", rawBody); log.info("[parseRequestBody][请求体为空]");
sendAuthResponse(context, RESULT_IGNORE); sendAuthResponse(context, RESULT_IGNORE);
return null; return null;
} }
return body; return body;
} catch (Exception e) { } catch (Exception e) {
log.error("[parseRequestBody][解析请求体失败][rawBody={}]", rawBody, e); log.error("[parseRequestBody][body({}) 解析请求体失败]", context.body().asString(), e);
sendAuthResponse(context, RESULT_IGNORE); sendAuthResponse(context, RESULT_IGNORE);
return null; return null;
} }
@@ -185,14 +181,14 @@ public class IotEmqxAuthEventHandler {
* @param password 密码 * @param password 密码
* @return 认证是否成功 * @return 认证是否成功
*/ */
private boolean performDeviceAuth(String clientId, String username, String password) { private boolean handleDeviceAuth(String clientId, String username, String password) {
try { try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO() CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password)); .setClientId(clientId).setUsername(username).setPassword(password));
result.checkError(); result.checkError();
return BooleanUtil.isTrue(result.getData()); return BooleanUtil.isTrue(result.getData());
} catch (Exception e) { } catch (Exception e) {
log.error("[performDeviceAuth][认证接口调用失败: {}]", username, e); log.error("[handleDeviceAuth][设备({}) 认证接口调用失败]", username, e);
throw e; throw e;
} }
} }
@@ -207,7 +203,7 @@ public class IotEmqxAuthEventHandler {
// 1. 解析设备信息 // 1. 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) { if (deviceInfo == null) {
log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username); log.debug("[handleDeviceStateChange][跳过非设备({})连接]", username);
return; return;
} }

View File

@@ -40,7 +40,7 @@ public class IotEmqxDownstreamHandler {
* @param message 设备消息 * @param message 设备消息
*/ */
public void handle(IotDeviceMessage message) { public void handle(IotDeviceMessage message) {
// 1. 获取设备信息(使用缓存) // 1. 获取设备信息
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) { if (deviceInfo == null) {
log.error("[handle][设备信息({})不存在]", message.getDeviceId()); log.error("[handle][设备信息({})不存在]", message.getDeviceId());
@@ -53,11 +53,10 @@ public class IotEmqxDownstreamHandler {
log.warn("[handle][未知的消息方法: {}]", message.getMethod()); log.warn("[handle][未知的消息方法: {}]", message.getMethod());
return; return;
} }
// 2.2 构建载荷 // 2.2 构建载荷
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
// 2.3 发布消息 // 2.3 发布消息
// TODO @haohao可以直接使用 bytes 作为 payload 么?
protocol.publishMessage(topic, new String(payload)); protocol.publishMessage(topic, new String(payload));
} }
@@ -77,17 +76,15 @@ public class IotEmqxDownstreamHandler {
return null; return null;
} }
// 2. 判断是否回复消息 // 2. 根据消息方法和回复状态,构建 topic
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @haohao这里判断要不去掉 methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST直接构建如果未来有不回复的在特殊搞开关
// 3. 根据消息方法和回复状态,构建主题
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) { if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
} }
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) { if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
} }
log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]",
message.getMethod(), isReply); message.getMethod(), isReply);
return null; return null;