diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index e9b0001e13..11cbfca270 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -45,14 +46,21 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { - @Bean - public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties) { - return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx()); + @Bean(destroyMethod = "close") + public Vertx emqxVertx() { + return Vertx.vertx(); } @Bean - public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties) { - return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties, + Vertx emqxVertx) { + return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); + } + + @Bean + public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties, + Vertx emqxVertx) { + return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @Bean diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java index 2ba902c5c5..059479b89d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java @@ -28,20 +28,20 @@ public class IotEmqxAuthEventProtocol { private final String serverId; - private Vertx vertx; + private final Vertx vertx; private HttpServer httpServer; - public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { + public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties, + Vertx vertx) { this.emqxProperties = emqxProperties; + this.vertx = vertx; this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); } @PostConstruct public void start() { try { - // 创建 Vertx 实例 - this.vertx = Vertx.vertx(); startHttpServer(); log.info("[start][IoT 网关 EMQX 认证事件协议服务启动成功, 端口: {}]", emqxProperties.getHttpPort()); } catch (Exception e) { @@ -53,17 +53,6 @@ public class IotEmqxAuthEventProtocol { @PreDestroy public void stop() { stopHttpServer(); - - // 关闭 Vertx 实例 - if (vertx != null) { - try { - vertx.close(); - log.debug("[stop][Vertx 实例已关闭]"); - } catch (Exception e) { - log.warn("[stop][关闭 Vertx 实例失败]", e); - } - } - log.info("[stop][IoT 网关 EMQX 认证事件协议服务已停止]"); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index a02aa17da0..fef3ce1723 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -10,7 +10,6 @@ import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import jodd.util.ThreadUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -41,9 +40,11 @@ public class IotEmqxUpstreamProtocol { private IotEmqxUpstreamHandler upstreamHandler; - public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { + public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties, + Vertx vertx) { this.emqxProperties = emqxProperties; this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); + this.vertx = vertx; } @PostConstruct @@ -53,13 +54,10 @@ public class IotEmqxUpstreamProtocol { } try { - // 1. 初始化 Vertx 实例 - this.vertx = Vertx.vertx(); - - // 2. 启动 MQTT 客户端 + // 1. 启动 MQTT 客户端 startMqttClient(); - // 3. 标记服务为运行状态 + // 2. 标记服务为运行状态 isRunning = true; log.info("[start][IoT 网关 EMQX 协议启动成功]"); } catch (Exception e) { @@ -67,10 +65,16 @@ public class IotEmqxUpstreamProtocol { stop(); // 异步关闭应用 - // TODO haohao:是不是不用 sleep 也行哈? Thread shutdownThread = new Thread(() -> { - ThreadUtil.sleep(1000); - log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); + try { + // 确保日志输出完成,使用更优雅的方式 + log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); + // 等待日志输出完成 + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("[start][应用关闭被中断]"); + } System.exit(1); }); shutdownThread.setDaemon(true); @@ -90,16 +94,7 @@ public class IotEmqxUpstreamProtocol { // 1. 停止 MQTT 客户端 stopMqttClient(); - // 2. 关闭 Vertx 实例 - if (vertx != null) { - try { - vertx.close(); - } catch (Exception e) { - log.warn("[stop][关闭 Vertx 实例失败]", e); - } - } - - // 3. 标记服务为停止状态 + // 2. 标记服务为停止状态 isRunning = false; log.info("[stop][IoT 网关 MQTT 协议服务已停止]"); } @@ -147,7 +142,7 @@ public class IotEmqxUpstreamProtocol { // 2. 等待连接结果 try { - // TODO @haohao:想了下,timeout 可以不设置,全靠 mqttclient 的超时时间? + // 应用层超时控制:防止启动过程无限阻塞,与MQTT客户端的网络超时是不同层次的控制 boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); if (!awaitResult) { log.error("[connectMqttSync][等待连接结果超时]"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index b1ecfde58d..da500835d0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -54,7 +54,8 @@ public class IotEmqxDownstreamHandler { return; } // 2.2 构建载荷 - byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), + deviceInfo.getDeviceName()); // 2.3 发布消息 protocol.publishMessage(topic, payload); } @@ -78,20 +79,54 @@ public class IotEmqxDownstreamHandler { // 2. 根据消息方法和回复状态,构建 topic boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - // TODO @芋艿:需要添加对应的 Topic,所以需要先判断消息方法类型 - // TODO @haohao:基于 method,然后逆推对应的 topic,可以哇?约定好~ - // 根据消息方法和回复状态构建对应的主题 + // 3. 根据消息方法类型构建对应的主题 switch (methodEnum) { case PROPERTY_POST: + // 属性上报:只支持回复消息(下行) if (isReply) { return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); } break; + case PROPERTY_SET: + // 属性设置:只支持非回复消息(下行) if (!isReply) { return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); } break; + + case EVENT_POST: + // 事件上报:只支持回复消息(下行) + if (isReply) { + String identifier = IotDeviceMessageUtils.getIdentifier(message); + if (StrUtil.isNotBlank(identifier)) { + return IotMqttTopicUtils.buildEventPostReplyTopic(productKey, deviceName, identifier); + } + } + break; + + case SERVICE_INVOKE: + // 服务调用:支持请求和回复 + String serviceIdentifier = IotDeviceMessageUtils.getIdentifier(message); + if (StrUtil.isNotBlank(serviceIdentifier)) { + if (isReply) { + return IotMqttTopicUtils.buildServiceReplyTopic(productKey, deviceName, serviceIdentifier); + } else { + return IotMqttTopicUtils.buildServiceTopic(productKey, deviceName, serviceIdentifier); + } + } + break; + + case CONFIG_PUSH: + // 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复) + if (!isReply) { + return IotMqttTopicUtils.buildConfigPushTopic(productKey, deviceName); + } + break; + + default: + log.warn("[buildTopicByMethod][未处理的消息方法: {}]", methodEnum); + break; } log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 270e2717ab..1faf6aeeb8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -91,4 +91,39 @@ public final class IotMqttTopicUtils { return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier; } + /** + * 构建设备服务调用回复主题 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param serviceIdentifier 服务标识符 + * @return 完整的主题路径 + */ + public static String buildServiceReplyTopic(String productKey, String deviceName, String serviceIdentifier) { + return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier + "_reply"; + } + + /** + * 构建设备事件上报回复主题 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param eventIdentifier 事件标识符 + * @return 完整的主题路径 + */ + public static String buildEventPostReplyTopic(String productKey, String deviceName, String eventIdentifier) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/" + eventIdentifier + "_reply"; + } + + /** + * 构建设备配置推送主题 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildConfigPushTopic(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push"; + } + } \ No newline at end of file