From 69e25eeaaccfa6aa81b252ea4de7c7159fe5aec0 Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Sat, 14 Jun 2025 20:44:55 +0800 Subject: [PATCH] =?UTF-8?q?reactor=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91=E9=87=8D=E6=9E=84=20EMQX=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=AE=A4=E8=AF=81=E5=92=8C=E4=B8=8B=E8=A1=8C?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 20 +- .../emqx/IotEmqxAuthEventProtocol.java | 113 +++++ .../IotEmqxDownstreamSubscriber.java} | 18 +- .../emqx/IotEmqxUpstreamProtocol.java | 328 +++++++++++++ .../router/IotEmqxAuthEventHandler.java} | 20 +- .../router/IotEmqxDownstreamHandler.java} | 12 +- .../router/IotEmqxUpstreamHandler.java} | 10 +- .../mqtt/IotMqttUpstreamProtocol.java | 430 ------------------ 8 files changed, 486 insertions(+), 465 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/{mqtt/IotMqttDownstreamSubscriber.java => emqx/IotEmqxDownstreamSubscriber.java} (78%) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/{mqtt/router/IotMqttHttpAuthHandler.java => emqx/router/IotEmqxAuthEventHandler.java} (94%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/{mqtt/router/IotMqttDownstreamHandler.java => emqx/router/IotEmqxDownstreamHandler.java} (91%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/{mqtt/router/IotMqttUpstreamHandler.java => emqx/router/IotEmqxUpstreamHandler.java} (88%) delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index d730e92782..e9b0001e13 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,10 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -37,7 +38,7 @@ public class IotGatewayConfiguration { } /** - * IoT 网关 MQTT 协议配置类 + * IoT 网关 EMQX 协议配置类 */ @Configuration @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true") @@ -45,14 +46,19 @@ public class IotGatewayConfiguration { public static class MqttProtocolConfiguration { @Bean - public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) { - return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties) { + return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx()); } @Bean - public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, + public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties) { + return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx()); + } + + @Bean + public IotEmqxDownstreamSubscriber iotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol mqttUpstreamProtocol, IotMessageBus messageBus) { - return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus); + return new IotEmqxDownstreamSubscriber(mqttUpstreamProtocol, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java new file mode 100644 index 0000000000..2ba902c5c5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java @@ -0,0 +1,113 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxAuthEventHandler; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 EMQX 认证事件协议服务 + *
+ * 为 EMQX 提供 HTTP 接口服务,包括:
+ * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件
+ * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotEmqxAuthEventProtocol {
+
+ private final IotGatewayProperties.EmqxProperties emqxProperties;
+
+ private final String serverId;
+
+ private Vertx vertx;
+
+ private HttpServer httpServer;
+
+ public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
+ this.emqxProperties = emqxProperties;
+ this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
+ }
+
+ @PostConstruct
+ public void start() {
+ try {
+ // 创建 Vertx 实例
+ this.vertx = Vertx.vertx();
+ startHttpServer();
+ log.info("[start][IoT 网关 EMQX 认证事件协议服务启动成功, 端口: {}]", emqxProperties.getHttpPort());
+ } catch (Exception e) {
+ log.error("[start][IoT 网关 EMQX 认证事件协议服务启动失败]", e);
+ throw e;
+ }
+ }
+
+ @PreDestroy
+ public void stop() {
+ stopHttpServer();
+
+ // 关闭 Vertx 实例
+ if (vertx != null) {
+ try {
+ vertx.close();
+ log.debug("[stop][Vertx 实例已关闭]");
+ } catch (Exception e) {
+ log.warn("[stop][关闭 Vertx 实例失败]", e);
+ }
+ }
+
+ log.info("[stop][IoT 网关 EMQX 认证事件协议服务已停止]");
+ }
+
+ /**
+ * 启动 HTTP 服务器
+ */
+ private void startHttpServer() {
+ int port = emqxProperties.getHttpPort();
+
+ // 1. 创建路由
+ Router router = Router.router(vertx);
+ router.route().handler(BodyHandler.create());
+
+ // 2. 创建处理器,传入 serverId
+ IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId);
+ router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth);
+ router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent);
+
+ // 3. 启动 HTTP 服务器
+ try {
+ httpServer = vertx.createHttpServer()
+ .requestHandler(router)
+ .listen(port)
+ .result();
+ } catch (Exception e) {
+ log.error("[startHttpServer][HTTP 服务器启动失败, 端口: {}]", port, e);
+ throw e;
+ }
+ }
+
+ /**
+ * 停止 HTTP 服务器
+ */
+ private void stopHttpServer() {
+ if (httpServer == null) {
+ return;
+ }
+
+ try {
+ httpServer.close().result();
+ log.info("[stopHttpServer][HTTP 服务器已停止]");
+ } catch (Exception e) {
+ log.error("[stopHttpServer][HTTP 服务器停止失败]", e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java
similarity index 78%
rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java
rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java
index 861c3a5496..61bf12376b 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java
@@ -1,29 +1,31 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
+package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
+import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
- * IoT 网关 MQTT 订阅者:接收下行给设备的消息
+ * IoT 网关 EMQX 订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@Slf4j
-public class IotMqttDownstreamSubscriber implements IotMessageSubscriber
- * 处理 EMQX 的认证请求和事件钩子,提供统一的错误处理和参数校验
+ * 为 EMQX 提供 HTTP 接口服务,包括:
+ * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件
+ * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知
+ * 提供统一的错误处理和参数校验
*
* @author 芋道源码
*/
@Slf4j
-public class IotMqttHttpAuthHandler {
+public class IotEmqxAuthEventHandler {
/**
* HTTP 成功状态码(EMQX 要求固定使用 200)
@@ -48,14 +50,14 @@ public class IotMqttHttpAuthHandler {
private static final String EVENT_CLIENT_CONNECTED = "client.connected";
private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected";
- private final IotMqttUpstreamProtocol protocol;
+ private final String serverId;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceCommonApi deviceApi;
- public IotMqttHttpAuthHandler(IotMqttUpstreamProtocol protocol) {
- this.protocol = protocol;
+ public IotEmqxAuthEventHandler(String serverId) {
+ this.serverId = serverId;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
}
@@ -216,7 +218,7 @@ public class IotMqttHttpAuthHandler {
// 3. 发送设备状态消息
deviceMessageService.sendDeviceMessage(message,
- deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
+ deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e);
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java
similarity index 91%
rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java
rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java
index 372184a41c..43ec9a2977 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
+package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
@@ -6,29 +6,29 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
+import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import lombok.extern.slf4j.Slf4j;
/**
- * IoT 网关 MQTT 下行消息处理器
+ * IoT 网关 EMQX 下行消息处理器
*
* 从消息总线接收到下行消息,然后发布到 MQTT Broker,从而被设备所接收
*
* @author 芋道源码
*/
@Slf4j
-public class IotMqttDownstreamHandler {
+public class IotEmqxDownstreamHandler {
- private final IotMqttUpstreamProtocol protocol;
+ private final IotEmqxUpstreamProtocol protocol;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
- public IotMqttDownstreamHandler(IotMqttUpstreamProtocol protocol) {
+ public IotEmqxDownstreamHandler(IotEmqxUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java
similarity index 88%
rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java
rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java
index 47d0a2f4a6..81d8cbb13a 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java
@@ -1,26 +1,26 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
+package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
+import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
/**
- * IoT 网关 MQTT 上行消息处理器
+ * IoT 网关 EMQX 上行消息处理器
*
* @author 芋道源码
*/
@Slf4j
-public class IotMqttUpstreamHandler {
+public class IotEmqxUpstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final String serverId;
- public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol) {
+ public IotEmqxUpstreamHandler(IotEmqxUpstreamProtocol protocol) {
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.serverId = protocol.getServerId();
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java
deleted file mode 100644
index 3cdfa08e4c..0000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java
+++ /dev/null
@@ -1,430 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
-
-import cn.hutool.core.collection.CollUtil;
-import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
-import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
-import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
-import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpServer;
-import io.vertx.ext.web.Router;
-import io.vertx.ext.web.handler.BodyHandler;
-import io.vertx.mqtt.MqttClient;
-import io.vertx.mqtt.MqttClientOptions;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * IoT 网关 MQTT 协议:接收设备上行消息
- *
- * 1. MQTT 客户端:连接 EMQX,消费处理设备上行和下行消息
- * 2. HTTP 认证服务:为 EMQX 提供设备认证、连接、断开接口
- *
- * @author 芋道源码
- */
-@Slf4j
-public class IotMqttUpstreamProtocol {
-
- private final IotGatewayProperties.EmqxProperties emqxProperties;
-
- /**
- * 服务运行状态标志
- */
- private volatile boolean isRunning = false;
-
- private Vertx vertx;
-
- @Getter
- private final String serverId;
-
- // MQTT 客户端相关
- private MqttClient mqttClient;
- private IotMqttUpstreamHandler upstreamHandler;
-
- // HTTP 认证服务相关
- private HttpServer httpAuthServer;
-
- public IotMqttUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
- this.emqxProperties = emqxProperties;
- this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
- }
-
- @PostConstruct
- public void start() {
- if (isRunning) {
- log.warn("[start][MQTT 协议服务已经在运行中,请勿重复启动]");
- return;
- }
- log.info("[start][启动 MQTT 协议服务]");
-
- try {
- this.vertx = Vertx.vertx();
-
- // 1. 启动 HTTP 认证服务
- startHttpAuthServer();
-
- // 2. 启动 MQTT 客户端
- startMqttClient();
-
- isRunning = true;
- log.info("[start][MQTT 协议服务启动完成]");
- } catch (Exception e) {
- log.error("[start][MQTT 协议服务启动失败]", e);
- // 启动失败时清理资源
- stop();
- throw e;
- }
- }
-
- @PreDestroy
- public void stop() {
- if (!isRunning) {
- log.warn("[stop][MQTT 协议服务已经停止,无需再次停止]");
- return;
- }
- log.info("[stop][停止 MQTT 协议服务]");
-
- // 1. 停止 MQTT 客户端
- stopMqttClient();
-
- // 2. 停止 HTTP 认证服务
- stopHttpAuthServer();
-
- // 3. 关闭 Vertx 实例
- if (vertx != null) {
- try {
- vertx.close();
- log.debug("[stop][Vertx 实例已关闭]");
- } catch (Exception e) {
- log.warn("[stop][关闭 Vertx 实例失败]", e);
- }
- }
-
- isRunning = false;
- log.info("[stop][MQTT 协议服务已停止]");
- }
-
- /**
- * 启动 HTTP 认证服务
- */
- private void startHttpAuthServer() {
- log.info("[startHttpAuthServer][启动 HTTP 认证服务]");
-
- // 1.1 创建路由
- Router router = Router.router(vertx);
- router.route().handler(BodyHandler.create());
- // 1.2 创建认证处理器
- IotMqttHttpAuthHandler authHandler = new IotMqttHttpAuthHandler(this);
- router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(authHandler::handleAuth);
- router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent);
-
- // 2. 启动 HTTP 服务器
- int authPort = emqxProperties.getHttpPort();
- try {
- httpAuthServer = vertx.createHttpServer()
- .requestHandler(router)
- .listen(authPort)
- .result();
- log.info("[startHttpAuthServer][HTTP 认证服务启动成功, 端口: {}]", authPort);
- } catch (Exception e) {
- log.error("[startHttpAuthServer][HTTP 认证服务启动失败]", e);
- throw e;
- }
- }
-
- /**
- * 停止 HTTP 认证服务
- */
- private void stopHttpAuthServer() {
- if (httpAuthServer == null) {
- return;
- }
- try {
- httpAuthServer.close().result();
- log.info("[stopHttpAuthServer][HTTP 认证服务已停止]");
- } catch (Exception e) {
- log.error("[stopHttpAuthServer][HTTP 认证服务停止失败]", e);
- }
- }
-
- /**
- * 启动 MQTT 客户端
- */
- private void startMqttClient() {
- log.info("[startMqttClient][启动 MQTT 客户端]");
-
- try {
- // 1. 初始化消息处理器
- this.upstreamHandler = new IotMqttUpstreamHandler(this);
-
- // 2. 创建 MQTT 客户端
- log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId());
- createMqttClient();
-
- // 3. 连接 MQTT Broker(同步等待首次连接结果)
- boolean connected = connectMqttSync();
- if (!connected) {
- throw new RuntimeException("首次连接 MQTT Broker 失败");
- }
-
- log.info("[startMqttClient][MQTT 客户端启动完成]");
- } catch (Exception e) {
- log.error("[startMqttClient][MQTT 客户端启动失败]", e);
- throw new RuntimeException("MQTT 客户端启动失败", e);
- }
- }
-
- /**
- * 同步连接 MQTT Broker
- *
- * @return 是否连接成功
- */
- private boolean connectMqttSync() {
- String host = emqxProperties.getMqttHost();
- Integer port = emqxProperties.getMqttPort();
- log.info("[connectMqttSync][开始连接 MQTT Broker, host: {}, port: {}]", host, port);
-
- // 使用计数器实现同步等待
- java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
- java.util.concurrent.atomic.AtomicBoolean success = new java.util.concurrent.atomic.AtomicBoolean(false);
-
- mqttClient.connect(port, host, connectResult -> {
- if (connectResult.succeeded()) {
- log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
- // 设置处理器
- setupMqttHandlers();
- // 订阅主题
- subscribeToTopics();
- success.set(true);
- } else {
- log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]",
- host, port, connectResult.cause());
- // 首次连接失败,启动重连机制
- reconnectWithDelay();
- }
- latch.countDown();
- });
-
- try {
- // 等待连接结果,最多等待10秒
- latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("[connectMqttSync][等待连接结果被中断]", e);
- }
-
- return success.get();
- }
-
- /**
- * 停止 MQTT 客户端
- */
- private void stopMqttClient() {
- // 1. 取消 MQTT 主题订阅
- if (mqttClient != null && mqttClient.isConnected()) {
- List