diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index 17431dad34..79dcf1d890 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -1,6 +1,5 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; -import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler; @@ -67,9 +66,11 @@ public class IotEmqxUpstreamProtocol { stop(); // 异步关闭应用,避免阻塞当前线程 + // TODO @haohao:是不是阻塞,也没关系哈? new Thread(() -> { try { - Thread.sleep(1000); // 等待1秒让日志输出完成 + // TODO @haohao:可以考虑用 ThreadUtil.sleep 更简洁 + Thread.sleep(1000); // 等待 1 秒让日志输出完成 log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -110,13 +111,10 @@ public class IotEmqxUpstreamProtocol { */ private void startMqttClient() { try { - // 2.1. 初始化消息处理器 + // 1. 初始化消息处理器 this.upstreamHandler = new IotEmqxUpstreamHandler(this); - // 2.2. 创建 MQTT 客户端 - createMqttClient(); - - // 2.3. 连接 MQTT Broker(同步等待首次连接结果) + // 2. 创建 MQTT 客户端,连接 MQTT Broker boolean connected = connectMqttSync(); if (!connected) { throw new RuntimeException("首次连接 MQTT Broker 失败"); @@ -134,11 +132,14 @@ public class IotEmqxUpstreamProtocol { * @param isSync 是否同步等待连接结果 * @return 当 isSync 为 true 时返回连接是否成功,否则返回 null */ + // TODO @haohao:是不是不用结果;结束后,直接判断 this.mqttClient.isConnected(); private Boolean connectMqtt(boolean isReconnect, boolean isSync) { + // TODO @haohao:这块代码,是不是放到 String host = emqxProperties.getMqttHost(); Integer port = emqxProperties.getMqttPort(); // 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端 + // TODO @hoahao:疑惑,为啥这里要重新创建对象呀?另外,创建是不是拿到 reconnectWithDelay 会更合适?这样和 startMqttClient 一样呢; if (isReconnect) { createMqttClient(); } @@ -148,7 +149,6 @@ public class IotEmqxUpstreamProtocol { AtomicBoolean success = isSync ? new AtomicBoolean(false) : null; - mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { if (isReconnect) { @@ -204,21 +204,18 @@ public class IotEmqxUpstreamProtocol { * 停止 MQTT 客户端 */ private void stopMqttClient() { - // 1.1. 取消订阅所有主题 + // 1. 取消订阅所有主题 if (mqttClient != null && mqttClient.isConnected()) { List topicList = emqxProperties.getMqttTopics(); - if (CollUtil.isNotEmpty(topicList)) { - for (String topic : topicList) { - try { - mqttClient.unsubscribe(topic); - } catch (Exception e) { - log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); - } + for (String topic : topicList) { + try { + mqttClient.unsubscribe(topic); + } catch (Exception e) { + log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); } } } - - // 1.2. 断开 MQTT 客户端连接 + // 2. 断开 MQTT 客户端连接 if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.disconnect(); @@ -244,18 +241,18 @@ public class IotEmqxUpstreamProtocol { * 设置 MQTT 处理器 */ private void setupMqttHandlers() { - // 1. 设置断开重连监听器 + // 1.1 设置断开重连监听器 mqttClient.closeHandler(closeEvent -> { - if (isRunning) { - log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); - reconnectWithDelay(); + if (!isRunning) { + return; } + log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); + reconnectWithDelay(); }); - - // 2. 设置异常处理器 + // 1.2 设置异常处理器 mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception)); - // 3. 设置消息处理器 + // 2. 设置消息处理器 mqttClient.publishHandler(upstreamHandler::handle); } @@ -270,16 +267,13 @@ public class IotEmqxUpstreamProtocol { return; } + // 2. 批量订阅所有主题 + Map topics = new HashMap<>(); int qos = emqxProperties.getMqttQos(); - - // 2. 构建主题-QoS 映射,批量订阅 - Map topicQosMap = new HashMap<>(); for (String topic : topicList) { - topicQosMap.put(topic, qos); + topics.put(topic, qos); } - - // 3. 批量订阅所有主题 - mqttClient.subscribe(topicQosMap, subscribeResult -> { + mqttClient.subscribe(topics, subscribeResult -> { if (subscribeResult.succeeded()) { log.info("[subscribeToTopics][订阅主题成功, 共 {} 个主题]", topicList.size()); } else { @@ -307,6 +301,7 @@ public class IotEmqxUpstreamProtocol { } catch (Exception e) { log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); } + // TODO @haohao:是不是把如果连接失败,放到这里处理?继续发起。。。这样,connect 逻辑更简单纯粹;1)首次连接,失败就退出;2)重新连接,如果失败,继续重试! }); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java index df22f988fe..6bf33e2b76 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; * 为 EMQX 提供 HTTP 接口服务,包括: * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 - * 提供统一的错误处理和参数校验 * * @author 芋道源码 */ @@ -83,7 +82,7 @@ public class IotEmqxAuthEventHandler { } // 2. 执行认证 - boolean authResult = performDeviceAuth(clientId, username, password); + boolean authResult = handleDeviceAuth(clientId, username, password); log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult); if (authResult) { sendAuthResponse(context, RESULT_ALLOW); @@ -97,8 +96,7 @@ public class IotEmqxAuthEventHandler { } /** - * EMQX 统一事件处理接口 - * 根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件 + * EMQX 统一事件处理接口:根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件 * 支持的事件类型:client.connected、client.disconnected 等 */ public void handleEvent(RoutingContext context) { @@ -160,18 +158,16 @@ public class IotEmqxAuthEventHandler { * @return 请求体JSON对象,解析失败时返回null */ private JsonObject parseRequestBody(RoutingContext context) { - String rawBody = null; try { - rawBody = context.body().asString(); JsonObject body = context.body().asJsonObject(); if (body == null) { - log.info("[parseRequestBody][请求体为空][rawBody={}]", rawBody); + log.info("[parseRequestBody][请求体为空]"); sendAuthResponse(context, RESULT_IGNORE); return null; } return body; } catch (Exception e) { - log.error("[parseRequestBody][解析请求体失败][rawBody={}]", rawBody, e); + log.error("[parseRequestBody][body({}) 解析请求体失败]", context.body().asString(), e); sendAuthResponse(context, RESULT_IGNORE); return null; } @@ -185,14 +181,14 @@ public class IotEmqxAuthEventHandler { * @param password 密码 * @return 认证是否成功 */ - private boolean performDeviceAuth(String clientId, String username, String password) { + private boolean handleDeviceAuth(String clientId, String username, String password) { try { CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() .setClientId(clientId).setUsername(username).setPassword(password)); result.checkError(); return BooleanUtil.isTrue(result.getData()); } catch (Exception e) { - log.error("[performDeviceAuth][认证接口调用失败: {}]", username, e); + log.error("[handleDeviceAuth][设备({}) 认证接口调用失败]", username, e); throw e; } } @@ -207,7 +203,7 @@ public class IotEmqxAuthEventHandler { // 1. 解析设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); if (deviceInfo == null) { - log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username); + log.debug("[handleDeviceStateChange][跳过非设备({})连接]", username); return; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 43ec9a2977..f0bbdb954b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -40,7 +40,7 @@ public class IotEmqxDownstreamHandler { * @param message 设备消息 */ public void handle(IotDeviceMessage message) { - // 1. 获取设备信息(使用缓存) + // 1. 获取设备信息 IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); if (deviceInfo == null) { log.error("[handle][设备信息({})不存在]", message.getDeviceId()); @@ -53,11 +53,10 @@ public class IotEmqxDownstreamHandler { log.warn("[handle][未知的消息方法: {}]", message.getMethod()); return; } - // 2.2 构建载荷 byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - // 2.3 发布消息 + // TODO @haohao:可以直接使用 bytes 作为 payload 么? protocol.publishMessage(topic, new String(payload)); } @@ -77,17 +76,15 @@ public class IotEmqxDownstreamHandler { return null; } - // 2. 判断是否回复消息 + // 2. 根据消息方法和回复状态,构建 topic boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - - // 3. 根据消息方法和回复状态,构建主题 + // TODO @haohao:这里判断,要不去掉 methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST?直接构建??如果未来有不回复的,在特殊搞开关; if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) { return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); } if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) { return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); } - log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", message.getMethod(), isReply); return null;