diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index 79dcf1d890..23f0447ea4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -10,6 +10,7 @@ import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import jodd.util.ThreadUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -65,19 +66,15 @@ public class IotEmqxUpstreamProtocol { log.error("[start][IoT 网关 EMQX 协议服务启动失败,应用将关闭]", e); stop(); - // 异步关闭应用,避免阻塞当前线程 - // TODO @haohao:是不是阻塞,也没关系哈? - new Thread(() -> { - try { - // TODO @haohao:可以考虑用 ThreadUtil.sleep 更简洁 - Thread.sleep(1000); // 等待 1 秒让日志输出完成 - log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } finally { - System.exit(1); // 直接关闭 JVM - } - }).start(); + // 异步关闭应用 + Thread shutdownThread = new Thread(() -> { + ThreadUtil.sleep(1000); + log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); + System.exit(1); + }); + shutdownThread.setDaemon(true); + shutdownThread.setName("emergency-shutdown"); + shutdownThread.start(); throw e; } @@ -114,114 +111,149 @@ public class IotEmqxUpstreamProtocol { // 1. 初始化消息处理器 this.upstreamHandler = new IotEmqxUpstreamHandler(this); - // 2. 创建 MQTT 客户端,连接 MQTT Broker - boolean connected = connectMqttSync(); - if (!connected) { - throw new RuntimeException("首次连接 MQTT Broker 失败"); - } + // 2. 创建 MQTT 客户端 + createMqttClient(); + + // 3. 同步连接 MQTT Broker + connectMqttSync(); } catch (Exception e) { log.error("[startMqttClient][MQTT 客户端启动失败]", e); - throw new RuntimeException("MQTT 客户端启动失败", e); + throw new RuntimeException("MQTT 客户端启动失败: " + e.getMessage(), e); } } - /** - * 连接 MQTT Broker - * - * @param isReconnect 是否为重连 - * @param isSync 是否同步等待连接结果 - * @return 当 isSync 为 true 时返回连接是否成功,否则返回 null - */ - // TODO @haohao:是不是不用结果;结束后,直接判断 this.mqttClient.isConnected(); - private Boolean connectMqtt(boolean isReconnect, boolean isSync) { - // TODO @haohao:这块代码,是不是放到 - String host = emqxProperties.getMqttHost(); - Integer port = emqxProperties.getMqttPort(); - - // 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端 - // TODO @hoahao:疑惑,为啥这里要重新创建对象呀?另外,创建是不是拿到 reconnectWithDelay 会更合适?这样和 startMqttClient 一样呢; - if (isReconnect) { - createMqttClient(); - } - - // 2.3.2. 连接 MQTT Broker - CountDownLatch latch = isSync ? new CountDownLatch(1) : null; - AtomicBoolean success = isSync - ? new AtomicBoolean(false) - : null; - mqttClient.connect(port, host, connectResult -> { - if (connectResult.succeeded()) { - if (isReconnect) { - log.info("[connectMqtt][MQTT 客户端重连成功]"); - } else { - log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); - } - // 设置处理器和订阅主题 - setupMqttHandlers(); - subscribeToTopics(); - if (success != null) { - success.set(true); - } - } else { - log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}]", host, port, connectResult.cause()); - if (isReconnect) { - reconnectWithDelay(); - } else { - log.error("[connectMqtt][首次连接失败,连接终止]"); - } - } - - if (latch != null) { - latch.countDown(); - } - }); - - // 2.3.3. 如果需要同步等待连接结果,则等待 - if (isSync) { - try { - latch.await(10, java.util.concurrent.TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("[connectMqtt][等待连接结果被中断]", e); - } - return success.get(); - } - - return null; - } - /** * 同步连接 MQTT Broker - * - * @return 是否连接成功 */ - private boolean connectMqttSync() { - Boolean result = connectMqtt(false, true); - return result != null ? result : false; + private void connectMqttSync() { + String host = emqxProperties.getMqttHost(); + int port = emqxProperties.getMqttPort(); + + // 1. 创建同步等待对象 + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean success = new AtomicBoolean(false); + + // 2. 连接 MQTT Broker + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); + setupMqttHandlers(); + subscribeToTopics(); + success.set(true); + } else { + log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]", + host, port, connectResult.cause()); + } + latch.countDown(); + }); + + // 3. 等待连接结果 + try { + boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); + if (!awaitResult) { + log.error("[connectMqttSync][等待连接结果超时]"); + throw new RuntimeException("连接 MQTT Broker 超时"); + } + if (!success.get()) { + throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", + host, port)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("[connectMqttSync][等待连接结果被中断]", e); + throw new RuntimeException("连接 MQTT Broker 被中断", e); + } + } + + /** + * 异步连接 MQTT Broker + */ + private void connectMqttAsync() { + String host = emqxProperties.getMqttHost(); + int port = emqxProperties.getMqttPort(); + + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + log.info("[connectMqttAsync][MQTT 客户端重连成功]"); + setupMqttHandlers(); + subscribeToTopics(); + } else { + log.error("[connectMqttAsync][连接 MQTT Broker 失败, host: {}, port: {}]", + host, port, connectResult.cause()); + log.warn("[connectMqttAsync][重连失败,将再次尝试]"); + reconnectWithDelay(); + } + }); + } + + /** + * 延迟重连 + */ + private void reconnectWithDelay() { + if (!isRunning) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + + long delay = emqxProperties.getReconnectDelayMs(); + log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay); + + vertx.setTimer(delay, timerId -> { + if (!isRunning) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + + log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); + try { + createMqttClient(); + connectMqttAsync(); + } catch (Exception e) { + log.error("[reconnectWithDelay][重连过程中发生异常]", e); + vertx.setTimer(delay, t -> reconnectWithDelay()); + } + }); } /** * 停止 MQTT 客户端 */ private void stopMqttClient() { - // 1. 取消订阅所有主题 - if (mqttClient != null && mqttClient.isConnected()) { - List topicList = emqxProperties.getMqttTopics(); - for (String topic : topicList) { + if (mqttClient == null) { + return; + } + + try { + if (mqttClient.isConnected()) { + // 1. 取消订阅所有主题 + List topicList = emqxProperties.getMqttTopics(); + for (String topic : topicList) { + try { + mqttClient.unsubscribe(topic); + } catch (Exception e) { + log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); + } + } + + // 2. 断开 MQTT 客户端连接 try { - mqttClient.unsubscribe(topic); + CountDownLatch disconnectLatch = new CountDownLatch(1); + mqttClient.disconnect(ar -> disconnectLatch.countDown()); + if (!disconnectLatch.await(5, java.util.concurrent.TimeUnit.SECONDS)) { + log.warn("[stopMqttClient][断开 MQTT 连接超时]"); + } } catch (Exception e) { - log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); + log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); } } - } - // 2. 断开 MQTT 客户端连接 - if (mqttClient != null && mqttClient.isConnected()) { - try { - mqttClient.disconnect(); - } catch (Exception e) { - log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); - } + } catch (Exception e) { + log.warn("[stopMqttClient][停止 MQTT 客户端过程中发生异常]", e); + } finally { + mqttClient = null; } } @@ -241,7 +273,7 @@ public class IotEmqxUpstreamProtocol { * 设置 MQTT 处理器 */ private void setupMqttHandlers() { - // 1.1 设置断开重连监听器 + // 1. 设置断开重连监听器 mqttClient.closeHandler(closeEvent -> { if (!isRunning) { return; @@ -249,10 +281,12 @@ public class IotEmqxUpstreamProtocol { log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); reconnectWithDelay(); }); - // 1.2 设置异常处理器 - mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception)); - // 2. 设置消息处理器 + // 2. 设置异常处理器 + mqttClient.exceptionHandler(exception -> + log.error("[exceptionHandler][MQTT 客户端异常]", exception)); + + // 3. 设置消息处理器 mqttClient.publishHandler(upstreamHandler::handle); } @@ -283,35 +317,13 @@ public class IotEmqxUpstreamProtocol { }); } - /** - * 延迟重连 - */ - private void reconnectWithDelay() { - long delay = emqxProperties.getReconnectDelayMs(); - vertx.setTimer(delay, timerId -> { - if (!isRunning) { - return; - } - if (mqttClient != null && mqttClient.isConnected()) { - return; - } - log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); - try { - connectMqtt(true, false); - } catch (Exception e) { - log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); - } - // TODO @haohao:是不是把如果连接失败,放到这里处理?继续发起。。。这样,connect 逻辑更简单纯粹;1)首次连接,失败就退出;2)重新连接,如果失败,继续重试! - }); - } - /** * 发布消息到 MQTT Broker * * @param topic 主题 * @param payload 消息内容 */ - public void publishMessage(String topic, String payload) { + public void publishMessage(String topic, byte[] payload) { if (mqttClient == null || !mqttClient.isConnected()) { log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息]"); return; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index f0bbdb954b..55e34c8a15 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -56,8 +56,7 @@ public class IotEmqxDownstreamHandler { // 2.2 构建载荷 byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); // 2.3 发布消息 - // TODO @haohao:可以直接使用 bytes 作为 payload 么? - protocol.publishMessage(topic, new String(payload)); + protocol.publishMessage(topic, payload); } /** @@ -78,13 +77,22 @@ public class IotEmqxDownstreamHandler { // 2. 根据消息方法和回复状态,构建 topic boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - // TODO @haohao:这里判断,要不去掉 methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST?直接构建??如果未来有不回复的,在特殊搞开关; - if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) { - return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); - } - if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) { - return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); + + // TODO @芋艿:需要添加对应的 Topic,所以需要先判断消息方法类型 + // 根据消息方法和回复状态构建对应的主题 + switch (methodEnum) { + case PROPERTY_POST: + if (isReply) { + return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); + } + break; + case PROPERTY_SET: + if (!isReply) { + return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); + } + break; } + log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", message.getMethod(), isReply); return null;