diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index 23f0447ea4..a02aa17da0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -67,6 +67,7 @@ public class IotEmqxUpstreamProtocol { stop(); // 异步关闭应用 + // TODO haohao:是不是不用 sleep 也行哈? Thread shutdownThread = new Thread(() -> { ThreadUtil.sleep(1000); log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); @@ -128,12 +129,9 @@ public class IotEmqxUpstreamProtocol { private void connectMqttSync() { String host = emqxProperties.getMqttHost(); int port = emqxProperties.getMqttPort(); - - // 1. 创建同步等待对象 + // 1. 连接 MQTT Broker CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(false); - - // 2. 连接 MQTT Broker mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); @@ -147,16 +145,16 @@ public class IotEmqxUpstreamProtocol { latch.countDown(); }); - // 3. 等待连接结果 + // 2. 等待连接结果 try { + // TODO @haohao:想了下,timeout 可以不设置,全靠 mqttclient 的超时时间? boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); if (!awaitResult) { log.error("[connectMqttSync][等待连接结果超时]"); throw new RuntimeException("连接 MQTT Broker 超时"); } if (!success.get()) { - throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", - host, port)); + throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", host, port)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -171,7 +169,6 @@ public class IotEmqxUpstreamProtocol { private void connectMqttAsync() { String host = emqxProperties.getMqttHost(); int port = emqxProperties.getMqttPort(); - mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { log.info("[connectMqttAsync][MQTT 客户端重连成功]"); @@ -199,7 +196,6 @@ public class IotEmqxUpstreamProtocol { long delay = emqxProperties.getReconnectDelayMs(); log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay); - vertx.setTimer(delay, timerId -> { if (!isRunning) { return; @@ -226,7 +222,6 @@ public class IotEmqxUpstreamProtocol { if (mqttClient == null) { return; } - try { if (mqttClient.isConnected()) { // 1. 取消订阅所有主题 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 55e34c8a15..b1ecfde58d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -79,6 +79,7 @@ public class IotEmqxDownstreamHandler { boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); // TODO @芋艿:需要添加对应的 Topic,所以需要先判断消息方法类型 + // TODO @haohao:基于 method,然后逆推对应的 topic,可以哇?约定好~ // 根据消息方法和回复状态构建对应的主题 switch (methodEnum) { case PROPERTY_POST: