From a3fc0730e9ddf951a94bf22d846860062ac193a5 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 16 Jun 2025 12:22:58 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91mqtt=20=E5=8D=8F=E8=AE=AE=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=EF=BC=88=E6=95=B4=E4=BD=93=E6=B2=A1=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E4=BA=86=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/emqx/IotEmqxUpstreamProtocol.java | 15 +++++---------- .../emqx/router/IotEmqxDownstreamHandler.java | 1 + 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index 23f0447ea4..a02aa17da0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -67,6 +67,7 @@ public class IotEmqxUpstreamProtocol { stop(); // 异步关闭应用 + // TODO haohao:是不是不用 sleep 也行哈? Thread shutdownThread = new Thread(() -> { ThreadUtil.sleep(1000); log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); @@ -128,12 +129,9 @@ public class IotEmqxUpstreamProtocol { private void connectMqttSync() { String host = emqxProperties.getMqttHost(); int port = emqxProperties.getMqttPort(); - - // 1. 创建同步等待对象 + // 1. 连接 MQTT Broker CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(false); - - // 2. 连接 MQTT Broker mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); @@ -147,16 +145,16 @@ public class IotEmqxUpstreamProtocol { latch.countDown(); }); - // 3. 等待连接结果 + // 2. 等待连接结果 try { + // TODO @haohao:想了下,timeout 可以不设置,全靠 mqttclient 的超时时间? boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); if (!awaitResult) { log.error("[connectMqttSync][等待连接结果超时]"); throw new RuntimeException("连接 MQTT Broker 超时"); } if (!success.get()) { - throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", - host, port)); + throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", host, port)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -171,7 +169,6 @@ public class IotEmqxUpstreamProtocol { private void connectMqttAsync() { String host = emqxProperties.getMqttHost(); int port = emqxProperties.getMqttPort(); - mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { log.info("[connectMqttAsync][MQTT 客户端重连成功]"); @@ -199,7 +196,6 @@ public class IotEmqxUpstreamProtocol { long delay = emqxProperties.getReconnectDelayMs(); log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay); - vertx.setTimer(delay, timerId -> { if (!isRunning) { return; @@ -226,7 +222,6 @@ public class IotEmqxUpstreamProtocol { if (mqttClient == null) { return; } - try { if (mqttClient.isConnected()) { // 1. 取消订阅所有主题 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 55e34c8a15..b1ecfde58d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -79,6 +79,7 @@ public class IotEmqxDownstreamHandler { boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); // TODO @芋艿:需要添加对应的 Topic,所以需要先判断消息方法类型 + // TODO @haohao:基于 method,然后逆推对应的 topic,可以哇?约定好~ // 根据消息方法和回复状态构建对应的主题 switch (methodEnum) { case PROPERTY_POST: