diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index d730e92782..e9b0001e13 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,10 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -37,7 +38,7 @@ public class IotGatewayConfiguration { } /** - * IoT 网关 MQTT 协议配置类 + * IoT 网关 EMQX 协议配置类 */ @Configuration @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true") @@ -45,14 +46,19 @@ public class IotGatewayConfiguration { public static class MqttProtocolConfiguration { @Bean - public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { - return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties) { + return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx()); } @Bean - public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, + public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties) { + return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + } + + @Bean + public IotEmqxDownstreamSubscriber iotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol mqttUpstreamProtocol, IotMessageBus messageBus) { - return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); + return new IotEmqxDownstreamSubscriber(mqttUpstreamProtocol, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java new file mode 100644 index 0000000000..2ba902c5c5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java @@ -0,0 +1,113 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxAuthEventHandler; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 EMQX 认证事件协议服务 + *

+ * 为 EMQX 提供 HTTP 接口服务,包括: + * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 + * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 + * + * @author 芋道源码 + */ +@Slf4j +public class IotEmqxAuthEventProtocol { + + private final IotGatewayProperties.EmqxProperties emqxProperties; + + private final String serverId; + + private Vertx vertx; + + private HttpServer httpServer; + + public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { + this.emqxProperties = emqxProperties; + this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); + } + + @PostConstruct + public void start() { + try { + // 创建 Vertx 实例 + this.vertx = Vertx.vertx(); + startHttpServer(); + log.info("[start][IoT 网关 EMQX 认证事件协议服务启动成功, 端口: {}]", emqxProperties.getHttpPort()); + } catch (Exception e) { + log.error("[start][IoT 网关 EMQX 认证事件协议服务启动失败]", e); + throw e; + } + } + + @PreDestroy + public void stop() { + stopHttpServer(); + + // 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close(); + log.debug("[stop][Vertx 实例已关闭]"); + } catch (Exception e) { + log.warn("[stop][关闭 Vertx 实例失败]", e); + } + } + + log.info("[stop][IoT 网关 EMQX 认证事件协议服务已停止]"); + } + + /** + * 启动 HTTP 服务器 + */ + private void startHttpServer() { + int port = emqxProperties.getHttpPort(); + + // 1. 创建路由 + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + + // 2. 创建处理器,传入 serverId + IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId); + router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth); + router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent); + + // 3. 启动 HTTP 服务器 + try { + httpServer = vertx.createHttpServer() + .requestHandler(router) + .listen(port) + .result(); + } catch (Exception e) { + log.error("[startHttpServer][HTTP 服务器启动失败, 端口: {}]", port, e); + throw e; + } + } + + /** + * 停止 HTTP 服务器 + */ + private void stopHttpServer() { + if (httpServer == null) { + return; + } + + try { + httpServer.close().result(); + log.info("[stopHttpServer][HTTP 服务器已停止]"); + } catch (Exception e) { + log.error("[stopHttpServer][HTTP 服务器停止失败]", e); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java similarity index 78% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java index 861c3a5496..61bf12376b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java @@ -1,29 +1,31 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxDownstreamHandler; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; /** - * IoT 网关 MQTT 订阅者:接收下行给设备的消息 + * IoT 网关 EMQX 订阅者:接收下行给设备的消息 * * @author 芋道源码 */ @Slf4j -public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { +public class IotEmqxDownstreamSubscriber implements IotMessageSubscriber { + + private final IotEmqxDownstreamHandler downstreamHandler; - private final IotMqttDownstreamHandler downstreamHandler; private final IotMessageBus messageBus; - private final IotMqttUpstreamProtocol protocol; - public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol, IotMessageBus messageBus) { + private final IotEmqxUpstreamProtocol protocol; + + public IotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol protocol, IotMessageBus messageBus) { this.protocol = protocol; this.messageBus = messageBus; - this.downstreamHandler = new IotMqttDownstreamHandler(protocol); + this.downstreamHandler = new IotEmqxDownstreamHandler(protocol); } @PostConstruct diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java new file mode 100644 index 0000000000..17431dad34 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -0,0 +1,328 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IoT 网关 EMQX 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotEmqxUpstreamProtocol { + + private final IotGatewayProperties.EmqxProperties emqxProperties; + + private volatile boolean isRunning = false; + + private Vertx vertx; + + @Getter + private final String serverId; + + private MqttClient mqttClient; + + private IotEmqxUpstreamHandler upstreamHandler; + + public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { + this.emqxProperties = emqxProperties; + this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); + } + + @PostConstruct + public void start() { + if (isRunning) { + return; + } + + try { + // 1. 初始化 Vertx 实例 + this.vertx = Vertx.vertx(); + + // 2. 启动 MQTT 客户端 + startMqttClient(); + + // 3. 标记服务为运行状态 + isRunning = true; + log.info("[start][IoT 网关 EMQX 协议启动成功]"); + } catch (Exception e) { + log.error("[start][IoT 网关 EMQX 协议服务启动失败,应用将关闭]", e); + stop(); + + // 异步关闭应用,避免阻塞当前线程 + new Thread(() -> { + try { + Thread.sleep(1000); // 等待1秒让日志输出完成 + log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + System.exit(1); // 直接关闭 JVM + } + }).start(); + + throw e; + } + } + + @PreDestroy + public void stop() { + if (!isRunning) { + return; + } + + // 1. 停止 MQTT 客户端 + stopMqttClient(); + + // 2. 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close(); + } catch (Exception e) { + log.warn("[stop][关闭 Vertx 实例失败]", e); + } + } + + // 3. 标记服务为停止状态 + isRunning = false; + log.info("[stop][IoT 网关 MQTT 协议服务已停止]"); + } + + /** + * 启动 MQTT 客户端 + */ + private void startMqttClient() { + try { + // 2.1. 初始化消息处理器 + this.upstreamHandler = new IotEmqxUpstreamHandler(this); + + // 2.2. 创建 MQTT 客户端 + createMqttClient(); + + // 2.3. 连接 MQTT Broker(同步等待首次连接结果) + boolean connected = connectMqttSync(); + if (!connected) { + throw new RuntimeException("首次连接 MQTT Broker 失败"); + } + } catch (Exception e) { + log.error("[startMqttClient][MQTT 客户端启动失败]", e); + throw new RuntimeException("MQTT 客户端启动失败", e); + } + } + + /** + * 连接 MQTT Broker + * + * @param isReconnect 是否为重连 + * @param isSync 是否同步等待连接结果 + * @return 当 isSync 为 true 时返回连接是否成功,否则返回 null + */ + private Boolean connectMqtt(boolean isReconnect, boolean isSync) { + String host = emqxProperties.getMqttHost(); + Integer port = emqxProperties.getMqttPort(); + + // 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端 + if (isReconnect) { + createMqttClient(); + } + + // 2.3.2. 连接 MQTT Broker + CountDownLatch latch = isSync ? new CountDownLatch(1) : null; + AtomicBoolean success = isSync + ? new AtomicBoolean(false) + : null; + + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + if (isReconnect) { + log.info("[connectMqtt][MQTT 客户端重连成功]"); + } else { + log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); + } + // 设置处理器和订阅主题 + setupMqttHandlers(); + subscribeToTopics(); + if (success != null) { + success.set(true); + } + } else { + log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}]", host, port, connectResult.cause()); + if (isReconnect) { + reconnectWithDelay(); + } else { + log.error("[connectMqtt][首次连接失败,连接终止]"); + } + } + + if (latch != null) { + latch.countDown(); + } + }); + + // 2.3.3. 如果需要同步等待连接结果,则等待 + if (isSync) { + try { + latch.await(10, java.util.concurrent.TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("[connectMqtt][等待连接结果被中断]", e); + } + return success.get(); + } + + return null; + } + + /** + * 同步连接 MQTT Broker + * + * @return 是否连接成功 + */ + private boolean connectMqttSync() { + Boolean result = connectMqtt(false, true); + return result != null ? result : false; + } + + /** + * 停止 MQTT 客户端 + */ + private void stopMqttClient() { + // 1.1. 取消订阅所有主题 + if (mqttClient != null && mqttClient.isConnected()) { + List topicList = emqxProperties.getMqttTopics(); + if (CollUtil.isNotEmpty(topicList)) { + for (String topic : topicList) { + try { + mqttClient.unsubscribe(topic); + } catch (Exception e) { + log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); + } + } + } + } + + // 1.2. 断开 MQTT 客户端连接 + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + } catch (Exception e) { + log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); + } + } + } + + /** + * 创建 MQTT 客户端 + */ + private void createMqttClient() { + MqttClientOptions options = new MqttClientOptions() + .setClientId(emqxProperties.getMqttClientId()) + .setUsername(emqxProperties.getMqttUsername()) + .setPassword(emqxProperties.getMqttPassword()) + .setSsl(emqxProperties.getMqttSsl()); + this.mqttClient = MqttClient.create(vertx, options); + } + + /** + * 设置 MQTT 处理器 + */ + private void setupMqttHandlers() { + // 1. 设置断开重连监听器 + mqttClient.closeHandler(closeEvent -> { + if (isRunning) { + log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); + reconnectWithDelay(); + } + }); + + // 2. 设置异常处理器 + mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception)); + + // 3. 设置消息处理器 + mqttClient.publishHandler(upstreamHandler::handle); + } + + /** + * 订阅设备上行消息主题 + */ + private void subscribeToTopics() { + // 1. 校验 MQTT 客户端是否连接 + List topicList = emqxProperties.getMqttTopics(); + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]"); + return; + } + + int qos = emqxProperties.getMqttQos(); + + // 2. 构建主题-QoS 映射,批量订阅 + Map topicQosMap = new HashMap<>(); + for (String topic : topicList) { + topicQosMap.put(topic, qos); + } + + // 3. 批量订阅所有主题 + mqttClient.subscribe(topicQosMap, subscribeResult -> { + if (subscribeResult.succeeded()) { + log.info("[subscribeToTopics][订阅主题成功, 共 {} 个主题]", topicList.size()); + } else { + log.error("[subscribeToTopics][订阅主题失败, 共 {} 个主题, 原因: {}]", + topicList.size(), subscribeResult.cause().getMessage(), subscribeResult.cause()); + } + }); + } + + /** + * 延迟重连 + */ + private void reconnectWithDelay() { + long delay = emqxProperties.getReconnectDelayMs(); + vertx.setTimer(delay, timerId -> { + if (!isRunning) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); + try { + connectMqtt(true, false); + } catch (Exception e) { + log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); + } + }); + } + + /** + * 发布消息到 MQTT Broker + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void publishMessage(String topic, String payload) { + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息]"); + return; + } + MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos()); + mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java similarity index 94% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java index 3410fb36a5..df22f988fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; @@ -8,21 +8,23 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; /** - * IoT 网关 MQTT HTTP 认证处理器 + * IoT 网关 EMQX 认证事件处理器 *

- * 处理 EMQX 的认证请求和事件钩子,提供统一的错误处理和参数校验 + * 为 EMQX 提供 HTTP 接口服务,包括: + * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 + * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 + * 提供统一的错误处理和参数校验 * * @author 芋道源码 */ @Slf4j -public class IotMqttHttpAuthHandler { +public class IotEmqxAuthEventHandler { /** * HTTP 成功状态码(EMQX 要求固定使用 200) @@ -48,14 +50,14 @@ public class IotMqttHttpAuthHandler { private static final String EVENT_CLIENT_CONNECTED = "client.connected"; private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected"; - private final IotMqttUpstreamProtocol protocol; + private final String serverId; private final IotDeviceMessageService deviceMessageService; private final IotDeviceCommonApi deviceApi; - public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) { - this.protocol = protocol; + public IotEmqxAuthEventHandler(String serverId) { + this.serverId = serverId; this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); } @@ -216,7 +218,7 @@ public class IotMqttHttpAuthHandler { // 3. 发送设备状态消息 deviceMessageService.sendDeviceMessage(message, - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); } catch (Exception e) { log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java similarity index 91% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 372184a41c..43ec9a2977 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; @@ -6,29 +6,29 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import lombok.extern.slf4j.Slf4j; /** - * IoT 网关 MQTT 下行消息处理器 + * IoT 网关 EMQX 下行消息处理器 *

* 从消息总线接收到下行消息,然后发布到 MQTT Broker,从而被设备所接收 * * @author 芋道源码 */ @Slf4j -public class IotMqttDownstreamHandler { +public class IotEmqxDownstreamHandler { - private final IotMqttUpstreamProtocol protocol; + private final IotEmqxUpstreamProtocol protocol; private final IotDeviceService deviceService; private final IotDeviceMessageService deviceMessageService; - public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) { + public IotEmqxDownstreamHandler(IotEmqxUpstreamProtocol protocol) { this.protocol = protocol; this.deviceService = SpringUtil.getBean(IotDeviceService.class); this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java similarity index 88% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java index 47d0a2f4a6..81d8cbb13a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java @@ -1,26 +1,26 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; /** - * IoT 网关 MQTT 上行消息处理器 + * IoT 网关 EMQX 上行消息处理器 * * @author 芋道源码 */ @Slf4j -public class IotMqttUpstreamHandler { +public class IotEmqxUpstreamHandler { private final IotDeviceMessageService deviceMessageService; private final String serverId; - public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) { + public IotEmqxUpstreamHandler(IotEmqxUpstreamProtocol protocol) { this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.serverId = protocol.getServerId(); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java deleted file mode 100644 index 3cdfa08e4c..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ /dev/null @@ -1,430 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; - -import cn.hutool.core.collection.CollUtil; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServer; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.handler.BodyHandler; -import io.vertx.mqtt.MqttClient; -import io.vertx.mqtt.MqttClientOptions; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * IoT 网关 MQTT 协议:接收设备上行消息 - *

- * 1. MQTT 客户端:连接 EMQX,消费处理设备上行和下行消息 - * 2. HTTP 认证服务:为 EMQX 提供设备认证、连接、断开接口 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttUpstreamProtocol { - - private final IotGatewayProperties.EmqxProperties emqxProperties; - - /** - * 服务运行状态标志 - */ - private volatile boolean isRunning = false; - - private Vertx vertx; - - @Getter - private final String serverId; - - // MQTT 客户端相关 - private MqttClient mqttClient; - private IotMqttUpstreamHandler upstreamHandler; - - // HTTP 认证服务相关 - private HttpServer httpAuthServer; - - public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) { - this.emqxProperties = emqxProperties; - this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); - } - - @PostConstruct - public void start() { - if (isRunning) { - log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]"); - return; - } - log.info("[start][启动 MQTT 协议服务]"); - - try { - this.vertx = Vertx.vertx(); - - // 1. 启动 HTTP 认证服务 - startHttpAuthServer(); - - // 2. 启动 MQTT 客户端 - startMqttClient(); - - isRunning = true; - log.info("[start][MQTT 协议服务启动完成]"); - } catch (Exception e) { - log.error("[start][MQTT 协议服务启动失败]", e); - // 启动失败时清理资源 - stop(); - throw e; - } - } - - @PreDestroy - public void stop() { - if (!isRunning) { - log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]"); - return; - } - log.info("[stop][停止 MQTT 协议服务]"); - - // 1. 停止 MQTT 客户端 - stopMqttClient(); - - // 2. 停止 HTTP 认证服务 - stopHttpAuthServer(); - - // 3. 关闭 Vertx 实例 - if (vertx != null) { - try { - vertx.close(); - log.debug("[stop][Vertx 实例已关闭]"); - } catch (Exception e) { - log.warn("[stop][关闭 Vertx 实例失败]", e); - } - } - - isRunning = false; - log.info("[stop][MQTT 协议服务已停止]"); - } - - /** - * 启动 HTTP 认证服务 - */ - private void startHttpAuthServer() { - log.info("[startHttpAuthServer][启动 HTTP 认证服务]"); - - // 1.1 创建路由 - Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); - // 1.2 创建认证处理器 - IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this); - router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth); - router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent); - - // 2. 启动 HTTP 服务器 - int authPort = emqxProperties.getHttpPort(); - try { - httpAuthServer = vertx.createHttpServer() - .requestHandler(router) - .listen(authPort) - .result(); - log.info("[startHttpAuthServer][HTTP 认证服务启动成功, 端口: {}]", authPort); - } catch (Exception e) { - log.error("[startHttpAuthServer][HTTP 认证服务启动失败]", e); - throw e; - } - } - - /** - * 停止 HTTP 认证服务 - */ - private void stopHttpAuthServer() { - if (httpAuthServer == null) { - return; - } - try { - httpAuthServer.close().result(); - log.info("[stopHttpAuthServer][HTTP 认证服务已停止]"); - } catch (Exception e) { - log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e); - } - } - - /** - * 启动 MQTT 客户端 - */ - private void startMqttClient() { - log.info("[startMqttClient][启动 MQTT 客户端]"); - - try { - // 1. 初始化消息处理器 - this.upstreamHandler = new IotMqttUpstreamHandler(this); - - // 2. 创建 MQTT 客户端 - log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId()); - createMqttClient(); - - // 3. 连接 MQTT Broker(同步等待首次连接结果) - boolean connected = connectMqttSync(); - if (!connected) { - throw new RuntimeException("首次连接 MQTT Broker 失败"); - } - - log.info("[startMqttClient][MQTT 客户端启动完成]"); - } catch (Exception e) { - log.error("[startMqttClient][MQTT 客户端启动失败]", e); - throw new RuntimeException("MQTT 客户端启动失败", e); - } - } - - /** - * 同步连接 MQTT Broker - * - * @return 是否连接成功 - */ - private boolean connectMqttSync() { - String host = emqxProperties.getMqttHost(); - Integer port = emqxProperties.getMqttPort(); - log.info("[connectMqttSync][开始连接 MQTT Broker, host: {}, port: {}]", host, port); - - // 使用计数器实现同步等待 - java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); - java.util.concurrent.atomic.AtomicBoolean success = new java.util.concurrent.atomic.AtomicBoolean(false); - - mqttClient.connect(port, host, connectResult -> { - if (connectResult.succeeded()) { - log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); - // 设置处理器 - setupMqttHandlers(); - // 订阅主题 - subscribeToTopics(); - success.set(true); - } else { - log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]", - host, port, connectResult.cause()); - // 首次连接失败,启动重连机制 - reconnectWithDelay(); - } - latch.countDown(); - }); - - try { - // 等待连接结果,最多等待10秒 - latch.await(10, java.util.concurrent.TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("[connectMqttSync][等待连接结果被中断]", e); - } - - return success.get(); - } - - /** - * 停止 MQTT 客户端 - */ - private void stopMqttClient() { - // 1. 取消 MQTT 主题订阅 - if (mqttClient != null && mqttClient.isConnected()) { - List topicList = emqxProperties.getMqttTopics(); - if (CollUtil.isNotEmpty(topicList)) { - for (String topic : topicList) { - try { - mqttClient.unsubscribe(topic); - log.debug("[stopMqttClient][取消订阅主题: {}]", topic); - } catch (Exception e) { - log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); - } - } - } - } - - // 2. 关闭 MQTT 客户端 - if (mqttClient != null && mqttClient.isConnected()) { - try { - mqttClient.disconnect(); - log.info("[stopMqttClient][MQTT 客户端已断开]"); - } catch (Exception e) { - log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); - } - } - } - - /** - * 连接 MQTT Broker 并订阅主题 - * - * @param isReconnect 是否为重连 - */ - private void connectMqtt(boolean isReconnect) { - // 1. 参数校验 - String host = emqxProperties.getMqttHost(); - Integer port = emqxProperties.getMqttPort(); - - if (isReconnect) { - log.info("[connectMqtt][开始重连 MQTT Broker, host: {}, port: {}]", host, port); - // 重连时重新创建客户端实例 - createMqttClient(); - } else { - log.info("[connectMqtt][开始连接 MQTT Broker, host: {}, port: {}]", host, port); - } - - // 2. 异步连接 - mqttClient.connect(port, host, connectResult -> { - if (!connectResult.succeeded()) { - log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]", - host, port, isReconnect, connectResult.cause()); - - // 首次连接失败或重连失败时,尝试重连 - if (!isReconnect) { - log.warn("[connectMqtt][首次连接失败,将开始重连机制]"); - } - reconnectWithDelay(); - return; - } - - if (isReconnect) { - log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port); - } else { - log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); - } - - // 设置处理器 - setupMqttHandlers(); - // 订阅主题 - subscribeToTopics(); - }); - } - - /** - * 创建 MQTT 客户端 - */ - private void createMqttClient() { - MqttClientOptions options = new MqttClientOptions() - .setClientId(emqxProperties.getMqttClientId()) - .setUsername(emqxProperties.getMqttUsername()) - .setPassword(emqxProperties.getMqttPassword()) - .setSsl(emqxProperties.getMqttSsl()); - this.mqttClient = MqttClient.create(vertx, options); - } - - /** - * 设置 MQTT 处理器 - */ - private void setupMqttHandlers() { - // 由于 mqttClient 在 createMqttClient() 方法中已初始化,此处无需检查 - // 设置断开重连监听器 - mqttClient.closeHandler(closeEvent -> { - log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); - reconnectWithDelay(); - }); - - // 设置异常处理器 - mqttClient.exceptionHandler(exception -> { - log.error("[exceptionHandler][MQTT 客户端异常]", exception); - }); - - // 设置消息处理器 - // upstreamHandler 在 startMqttClient() 方法中已初始化,此处无需检查 - mqttClient.publishHandler(upstreamHandler::handle); - log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]"); - } - - /** - * 订阅设备上行消息主题 - */ - private void subscribeToTopics() { - List topicList = emqxProperties.getMqttTopics(); - if (CollUtil.isEmpty(topicList)) { - log.warn("[subscribeToTopics][订阅主题列表为空, 跳过订阅]"); - return; - } - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]"); - return; - } - - int qos = emqxProperties.getMqttQos(); - log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos); - - // 使用 AtomicInteger 替代数组,线程安全且更简洁 - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger failCount = new AtomicInteger(0); - - // 构建主题-QoS 映射,批量订阅 - Map topicQosMap = new HashMap<>(); - for (String topic : topicList) { - topicQosMap.put(topic, qos); - } - - // 批量订阅所有主题 - mqttClient.subscribe(topicQosMap, subscribeResult -> { - if (subscribeResult.succeeded()) { - // 批量订阅成功,记录所有主题为成功 - int successful = successCount.addAndGet(topicList.size()); - log.info("[subscribeToTopics][批量订阅主题成功, 共 {} 个主题, QoS: {}]", successful, qos); - for (String topic : topicList) { - log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos); - } - } else { - // 批量订阅失败,记录所有主题为失败 - int failed = failCount.addAndGet(topicList.size()); - log.error("[subscribeToTopics][批量订阅主题失败, 共 {} 个主题, 原因: {}]", - failed, subscribeResult.cause().getMessage(), subscribeResult.cause()); - for (String topic : topicList) { - log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}]", topic, qos); - } - } - - // 记录汇总日志 - log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", - successCount.get(), failCount.get(), topicList.size()); - }); - } - - /** - * 延迟重连 - */ - private void reconnectWithDelay() { - long delay = emqxProperties.getReconnectDelayMs(); - vertx.setTimer(delay, timerId -> { - if (!isRunning) { - log.debug("[reconnectWithDelay][服务已停止, 取消重连]"); - return; - } - // 检查连接状态,如果已连接则无需重连 - if (mqttClient != null && mqttClient.isConnected()) { - log.debug("[reconnectWithDelay][MQTT 客户端已连接, 无需重连]"); - return; - } - log.info("[reconnectWithDelay][开始重连 MQTT Broker, 延迟: {} ms]", delay); - try { - connectMqtt(true); // 标记为重连 - } catch (Exception e) { - log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e); - // 重连失败时,不需要重复调用,因为 connectMqtt(true) 内部已经处理了重连逻辑 - } - }); - } - - /** - * 发布消息到 MQTT Broker - * - * @param topic 主题 - * @param payload 消息内容 - */ - public void publishMessage(String topic, String payload) { - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息到 topic({})]", topic); - return; - } - MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos()); - mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); - } - -} \ No newline at end of file