diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 0101f32aaa..852b2e67b4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -1,6 +1,5 @@ package cn.iocoder.yudao.module.iot.gateway.config; -import cn.hutool.core.util.StrUtil; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import lombok.Data; @@ -105,46 +104,53 @@ public class IotGatewayProperties { @NotNull(message = "是否开启不能为空") private Boolean enabled; - // TODO @haohao:是不是改成 httpPort?不只认证,目前看。 /** - * HTTP 认证端口(默认:8090) + * HTTP 服务端口(默认:8090) */ - private Integer httpAuthPort = 8090; + private Integer httpPort = 8090; /** * MQTT 服务器地址 */ @NotEmpty(message = "MQTT 服务器地址不能为空") private String mqttHost; + /** * MQTT 服务器端口(默认:1883) */ @NotNull(message = "MQTT 服务器端口不能为空") private Integer mqttPort = 1883; + /** * MQTT 用户名 */ @NotEmpty(message = "MQTT 用户名不能为空") private String mqttUsername; + /** * MQTT 密码 */ @NotEmpty(message = "MQTT 密码不能为空") private String mqttPassword; + /** * MQTT 客户端的 SSL 开关 */ @NotNull(message = "MQTT 是否开启 SSL 不能为空") private Boolean mqttSsl = false; + /** * MQTT 客户端 ID(如果为空,系统将自动生成) */ + @NotEmpty(message = "MQTT 客户端 ID 不能为空") private String mqttClientId; + /** * MQTT 订阅的主题 */ @NotEmpty(message = "MQTT 主题不能为空") private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; + /** * 默认 QoS 级别 *

@@ -158,24 +164,12 @@ public class IotGatewayProperties { * 连接超时时间(秒) */ private Integer connectTimeoutSeconds = 10; + /** * 重连延迟时间(毫秒) */ private Long reconnectDelayMs = 5000L; - // TODO @haohao:貌似可以通过配置文件 + el 表达式;尽量还是配置文件; - /** - * 获取 MQTT 客户端 ID,如果未配置则自动生成 - * - * @return MQTT 客户端 ID - */ - public String getMqttClientId() { - if (StrUtil.isBlank(mqttClientId)) { - mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis(); - } - return mqttClientId; - } - } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index 29dc0b59aa..3cdfa08e4c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler; @@ -20,7 +19,10 @@ import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * IoT 网关 MQTT 协议:接收设备上行消息 @@ -127,7 +129,7 @@ public class IotMqttUpstreamProtocol { router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent); // 2. 启动 HTTP 服务器 - int authPort = emqxProperties.getHttpAuthPort(); + int authPort = emqxProperties.getHttpPort(); try { httpAuthServer = vertx.createHttpServer() .requestHandler(router) @@ -169,16 +171,61 @@ public class IotMqttUpstreamProtocol { log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId()); createMqttClient(); - // 3. 连接 MQTT Broker(异步连接,不会抛出异常) - connectMqtt(false); + // 3. 连接 MQTT Broker(同步等待首次连接结果) + boolean connected = connectMqttSync(); + if (!connected) { + throw new RuntimeException("首次连接 MQTT Broker 失败"); + } - log.info("[startMqttClient][MQTT 客户端启动完成,正在异步连接中...]"); + log.info("[startMqttClient][MQTT 客户端启动完成]"); } catch (Exception e) { log.error("[startMqttClient][MQTT 客户端启动失败]", e); throw new RuntimeException("MQTT 客户端启动失败", e); } } + /** + * 同步连接 MQTT Broker + * + * @return 是否连接成功 + */ + private boolean connectMqttSync() { + String host = emqxProperties.getMqttHost(); + Integer port = emqxProperties.getMqttPort(); + log.info("[connectMqttSync][开始连接 MQTT Broker, host: {}, port: {}]", host, port); + + // 使用计数器实现同步等待 + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.atomic.AtomicBoolean success = new java.util.concurrent.atomic.AtomicBoolean(false); + + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); + // 设置处理器 + setupMqttHandlers(); + // 订阅主题 + subscribeToTopics(); + success.set(true); + } else { + log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]", + host, port, connectResult.cause()); + // 首次连接失败,启动重连机制 + reconnectWithDelay(); + } + latch.countDown(); + }); + + try { + // 等待连接结果,最多等待10秒 + latch.await(10, java.util.concurrent.TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("[connectMqttSync][等待连接结果被中断]", e); + } + + return success.get(); + } + /** * 停止 MQTT 客户端 */ @@ -218,15 +265,6 @@ public class IotMqttUpstreamProtocol { // 1. 参数校验 String host = emqxProperties.getMqttHost(); Integer port = emqxProperties.getMqttPort(); - // TODO @haohao:这些参数校验,交给 validator; - if (StrUtil.isBlank(host)) { - log.error("[connectMqtt][MQTT Host 为空, 无法连接]"); - throw new IllegalArgumentException("MQTT Host 不能为空"); - } - if (port == null || port <= 0) { - log.error("[connectMqtt][MQTT Port({}) 无效]", port); - throw new IllegalArgumentException("MQTT Port 必须为正整数"); - } if (isReconnect) { log.info("[connectMqtt][开始重连 MQTT Broker, host: {}, port: {}]", host, port); @@ -238,32 +276,28 @@ public class IotMqttUpstreamProtocol { // 2. 异步连接 mqttClient.connect(port, host, connectResult -> { - // TODO @haohao:if return,减少括号哈; - if (connectResult.succeeded()) { - if (isReconnect) { - log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port); - } else { - log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); - } - - // 设置处理器 - setupMqttHandlers(); - // 订阅主题 - subscribeToTopics(); - } else { + if (!connectResult.succeeded()) { log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]", host, port, isReconnect, connectResult.cause()); - // TODO @haohao:体感上,是不是首次必须连接成功?类似 mysql;首次要连接上,然后后续可以重连; + // 首次连接失败或重连失败时,尝试重连 if (!isReconnect) { - // 首次连接失败时,也要尝试重连 log.warn("[connectMqtt][首次连接失败,将开始重连机制]"); - reconnectWithDelay(); - } else { - // 重连失败时,继续尝试重连 - reconnectWithDelay(); } + reconnectWithDelay(); + return; } + + if (isReconnect) { + log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port); + } else { + log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port); + } + + // 设置处理器 + setupMqttHandlers(); + // 订阅主题 + subscribeToTopics(); }); } @@ -283,12 +317,7 @@ public class IotMqttUpstreamProtocol { * 设置 MQTT 处理器 */ private void setupMqttHandlers() { - // TODO @haohao:mqttClient 一定非空; - if (mqttClient == null) { - log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]"); - return; - } - + // 由于 mqttClient 在 createMqttClient() 方法中已初始化,此处无需检查 // 设置断开重连监听器 mqttClient.closeHandler(closeEvent -> { log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); @@ -301,13 +330,9 @@ public class IotMqttUpstreamProtocol { }); // 设置消息处理器 - // TODO @haohao:upstreamHandler 一定非空; - if (upstreamHandler != null) { - mqttClient.publishHandler(upstreamHandler::handle); - log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]"); - } else { - log.warn("[setupMqttHandlers][上行消息处理器为空,跳过设置]"); - } + // upstreamHandler 在 startMqttClient() 方法中已初始化,此处无需检查 + mqttClient.publishHandler(upstreamHandler::handle); + log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]"); } /** @@ -327,35 +352,39 @@ public class IotMqttUpstreamProtocol { int qos = emqxProperties.getMqttQos(); log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos); - // TODO @haohao:使用 atomicinteger 会更合适; - int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改 - int[] failCount = { 0 }; + // 使用 AtomicInteger 替代数组,线程安全且更简洁 + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + // 构建主题-QoS 映射,批量订阅 + Map topicQosMap = new HashMap<>(); for (String topic : topicList) { - // TODO @haohao:MqttClient subscribe(Map topics, 是不是更简洁哈; - mqttClient.subscribe(topic, qos, subscribeResult -> { - if (subscribeResult.succeeded()) { - successCount[0]++; - log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos); - - // 当所有主题都处理完成时,记录汇总日志 - if (successCount[0] + failCount[0] == topicList.size()) { - log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", - successCount[0], failCount[0], topicList.size()); - } - } else { - failCount[0]++; - log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}, 原因: {}]", - topic, qos, subscribeResult.cause().getMessage(), subscribeResult.cause()); - - // 当所有主题都处理完成时,记录汇总日志 - if (successCount[0] + failCount[0] == topicList.size()) { - log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", - successCount[0], failCount[0], topicList.size()); - } - } - }); + topicQosMap.put(topic, qos); } + + // 批量订阅所有主题 + mqttClient.subscribe(topicQosMap, subscribeResult -> { + if (subscribeResult.succeeded()) { + // 批量订阅成功,记录所有主题为成功 + int successful = successCount.addAndGet(topicList.size()); + log.info("[subscribeToTopics][批量订阅主题成功, 共 {} 个主题, QoS: {}]", successful, qos); + for (String topic : topicList) { + log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos); + } + } else { + // 批量订阅失败,记录所有主题为失败 + int failed = failCount.addAndGet(topicList.size()); + log.error("[subscribeToTopics][批量订阅主题失败, 共 {} 个主题, 原因: {}]", + failed, subscribeResult.cause().getMessage(), subscribeResult.cause()); + for (String topic : topicList) { + log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}]", topic, qos); + } + } + + // 记录汇总日志 + log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]", + successCount.get(), failCount.get(), topicList.size()); + }); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java index 4599e1f071..372184a41c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java @@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; -import cn.hutool.json.JSONObject; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; @@ -48,52 +48,49 @@ public class IotMqttDownstreamHandler { } // 2.1 根据方法构建主题 - String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + String topic = buildTopicByMethod(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); if (StrUtil.isBlank(topic)) { log.warn("[handle][未知的消息方法: {}]", message.getMethod()); return; } + // 2.2 构建载荷 - // TODO @haohao:这里是不是 encode 就可以发拉?因为本身就 json 化了。 - JSONObject payload = buildDownstreamPayload(message); + byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + // 2.3 发布消息 - protocol.publishMessage(topic, payload.toString()); + protocol.publishMessage(topic, new String(payload)); } - // TODO @haohao:这个是不是也可以计算;IotDeviceMessageUtils 的 isReplyMessage;这样就直接生成了; /** - * 根据方法构建主题 + * 根据消息方法和回复状态构建主题 * - * @param method 消息方法 + * @param message 设备消息 * @param productKey 产品标识 * @param deviceName 设备名称 * @return 构建的主题,如果方法不支持返回 null */ - private String buildTopicByMethod(String method, String productKey, String deviceName) { - IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method); + private String buildTopicByMethod(IotDeviceMessage message, String productKey, String deviceName) { + // 1. 解析消息方法 + IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod()); if (methodEnum == null) { + log.warn("[buildTopicByMethod][未知的消息方法: {}]", message.getMethod()); return null; } - return switch (methodEnum) { - case PROPERTY_POST -> IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); - case PROPERTY_SET -> IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); - default -> null; - }; - } + // 2. 判断是否回复消息 + boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - /** - * 构建下行消息载荷 - * - * @param message 设备消息 - * @return JSON 载荷 - */ - private JSONObject buildDownstreamPayload(IotDeviceMessage message) { - // 使用 IotDeviceMessageService 进行消息编码 - IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId()); - byte[] encodedBytes = deviceMessageService.encodeDeviceMessage(message, device.getProductKey(), - device.getDeviceName()); - return new JSONObject(new String(encodedBytes)); + // 3. 根据消息方法和回复状态,构建主题 + if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) { + return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); + } + if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) { + return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); + } + + log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", + message.getMethod(), isReply); + return null; } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java index b9dcdb5cce..3410fb36a5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java @@ -14,8 +14,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; -import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; - /** * IoT 网关 MQTT HTTP 认证处理器 *

@@ -67,7 +65,7 @@ public class IotMqttHttpAuthHandler { */ public void handleAuth(RoutingContext context) { try { - // 参数校验 + // 1. 参数校验 JsonObject body = parseRequestBody(context); if (body == null) { return; @@ -78,23 +76,21 @@ public class IotMqttHttpAuthHandler { log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username); if (StrUtil.hasEmpty(clientId, username, password)) { log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username); - sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整"); + sendAuthResponse(context, RESULT_DENY); return; } - // 执行设备认证 + // 2. 执行认证 boolean authResult = performDeviceAuth(clientId, username, password); + log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult); if (authResult) { - // TODO @haohao:是不是两条 info,直接打认证结果:authResult - log.info("[handleAuth][设备认证成功: {}]", username); - sendAuthResponse(context, RESULT_ALLOW, false, null); + sendAuthResponse(context, RESULT_ALLOW); } else { - log.info("[handleAuth][设备认证失败: {}]", username); - sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg()); + sendAuthResponse(context, RESULT_DENY); } } catch (Exception e) { log.error("[handleAuth][设备认证异常]", e); - sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常"); + sendAuthResponse(context, RESULT_IGNORE); } } @@ -104,9 +100,10 @@ public class IotMqttHttpAuthHandler { * 支持的事件类型:client.connected、client.disconnected 等 */ public void handleEvent(RoutingContext context) { + JsonObject body = null; try { - // 解析请求体 - JsonObject body = parseRequestBody(context); + // 1. 解析请求体 + body = parseRequestBody(context); if (body == null) { return; } @@ -114,7 +111,7 @@ public class IotMqttHttpAuthHandler { String username = body.getString("username"); log.debug("[handleEvent][收到事件: {} - {}]", event, username); - // 根据事件类型进行分发处理 + // 2. 根据事件类型进行分发处理 switch (event) { case EVENT_CLIENT_CONNECTED: handleClientConnected(body); @@ -123,15 +120,13 @@ public class IotMqttHttpAuthHandler { handleClientDisconnected(body); break; default: - log.debug("[handleEvent][忽略事件: {}]", event); break; } // EMQX Webhook 只需要 200 状态码,无需响应体 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } catch (Exception e) { - // TODO @haohao:body 可以打印出来 - log.error("[handleEvent][事件处理失败]", e); + log.error("[handleEvent][事件处理失败][body={}]", body != null ? body.encode() : "null", e); // 即使处理失败,也返回 200 避免EMQX重试 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } @@ -163,18 +158,19 @@ public class IotMqttHttpAuthHandler { * @return 请求体JSON对象,解析失败时返回null */ private JsonObject parseRequestBody(RoutingContext context) { + String rawBody = null; try { + rawBody = context.body().asString(); JsonObject body = context.body().asJsonObject(); if (body == null) { - log.info("[parseRequestBody][请求体为空]"); - sendAuthResponse(context, RESULT_IGNORE, false, "请求体不能为空"); + log.info("[parseRequestBody][请求体为空][rawBody={}]", rawBody); + sendAuthResponse(context, RESULT_IGNORE); return null; } return body; } catch (Exception e) { - // TODO @haohao:最好把 body 打印出来; - log.error("[parseRequestBody][解析请求体失败]", e); - sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误"); + log.error("[parseRequestBody][解析请求体失败][rawBody={}]", rawBody, e); + sendAuthResponse(context, RESULT_IGNORE); return null; } } @@ -203,13 +199,10 @@ public class IotMqttHttpAuthHandler { * 处理设备状态变化 * * @param username 用户名 - * @param online 是否在线 + * @param online 是否在线 true 在线 false 离线 */ private void handleDeviceStateChange(String username, boolean online) { - // 解析设备信息 - if (StrUtil.isEmpty(username) || "undefined".equals(username)) { - return; - } + // 1. 解析设备信息 IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); if (deviceInfo == null) { log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username); @@ -217,24 +210,13 @@ public class IotMqttHttpAuthHandler { } try { - // TODO @haohao:serverId 获取非空,可以忽略掉; - String serverId = protocol.getServerId(); - if (StrUtil.isEmpty(serverId)) { - log.error("[handleDeviceStateChange][获取服务器ID失败]"); - return; - } - - // 构建设备状态消息 + // 2. 构建设备状态消息 IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline() : IotDeviceMessage.buildStateOffline(); - // 发送消息到消息总线 - deviceMessageService.sendDeviceMessage(message, - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); - // TODO @haohao:online 不用翻译 - log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]", - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), - online ? "在线" : "离线"); + // 3. 发送设备状态消息 + deviceMessageService.sendDeviceMessage(message, + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); } catch (Exception e) { log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e); } @@ -244,16 +226,14 @@ public class IotMqttHttpAuthHandler { * 发送 EMQX 认证响应 * 根据 EMQX 官方文档要求,必须返回 JSON 格式响应 * - * @param context 路由上下文 - * @param result 认证结果:allow、deny、ignore - * @param isSuperuser 是否超级用户 - * @param message 日志消息(仅用于日志记录,不返回给EMQX) + * @param context 路由上下文 + * @param result 认证结果:allow、deny、ignore */ - private void sendAuthResponse(RoutingContext context, String result, boolean isSuperuser, String message) { + private void sendAuthResponse(RoutingContext context, String result) { // 构建符合 EMQX 官方规范的响应 JsonObject response = new JsonObject() .put("result", result) - .put("is_superuser", isSuperuser); + .put("is_superuser", false); // 可以根据业务需求添加客户端属性 // response.put("client_attrs", new JsonObject().put("role", "device")); @@ -261,7 +241,6 @@ public class IotMqttHttpAuthHandler { // 可以添加认证过期时间(可选) // response.put("expire_at", System.currentTimeMillis() / 1000 + 3600); - // 记录详细的响应日志(message仅用于日志,不返回给EMQX) context.response() .setStatusCode(SUCCESS_STATUS_CODE) .putHeader("Content-Type", "application/json; charset=utf-8") diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 8098f54427..47d0a2f4a6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -7,9 +7,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; -import org.springframework.util.Assert; - -import java.util.Arrays; /** * IoT 网关 MQTT 上行消息处理器 @@ -32,38 +29,24 @@ public class IotMqttUpstreamHandler { * 处理 MQTT 发布消息 */ public void handle(MqttPublishMessage mqttMessage) { + log.info("[handle][收到 MQTT 消息, topic: {}, payload: {}]", mqttMessage.topicName(), mqttMessage.payload()); String topic = mqttMessage.topicName(); byte[] payload = mqttMessage.payload().getBytes(); try { - // 1. 前置校验 - if (StrUtil.isBlank(topic)) { - log.warn("[handle][主题为空, 忽略消息]"); + // 1. 解析主题,一次性获取所有信息 + String[] topicParts = topic.split("/"); + if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + log.warn("[handle][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); return; } - // 2.1 识别并验证消息类型 - String messageType = getMessageType(topic); - // TODO @haohao:可以使用 hutool 的,它的字符串拼接更简单; - Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic)); - // 2.2 解析主题,获取 productKey 和 deviceName - // TODO @haohao:体感 getMessageType 和下面,都 split;是不是一次就 ok 拉;1)split 掉;2)2、3 位置是 productKey、deviceName;3)4 开始还是 method - String[] topicParts = topic.split("/"); - if (topicParts.length < 4) { - log.warn("[handle][topic({}) 格式不正确,无法解析 productKey 和 deviceName]", topic); - return; - } String productKey = topicParts[2]; String deviceName = topicParts[3]; - // TODO @haohao:是不是要判断,部分为空,就不行呀; - if (StrUtil.isAllBlank(productKey, deviceName)) { - log.warn("[handle][topic({}) 格式不正确,productKey 和 deviceName 部分为空]", topic); - return; - } // 3. 解码消息 IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); if (message == null) { - log.warn("[handle][topic({}) payload({}) 消息解码失败", topic, new String(payload)); + log.warn("[handle][topic({}) payload({}) 消息解码失败]", topic, new String(payload)); return; } @@ -74,22 +57,4 @@ public class IotMqttUpstreamHandler { } } - // TODO @haohao:是不是 getMethodFromTopic? - /** - * 从主题中,获得消息类型 - * - * @param topic 主题 - * @return 消息类型 - */ - private String getMessageType(String topic) { - String[] topicParts = topic.split("/"); - // 约定:topic 第 4 个部分开始为消息类型 - // 例如:/sys/{productKey}/{deviceName}/thing/property/post -> thing/property/post - if (topicParts.length > 4) { - // TODO @haohao:是不是 subString 前 3 个,性能更好; - return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length)); - } - return topicParts[topicParts.length - 1]; - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml index 21514ddabd..384799eebf 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml @@ -32,13 +32,13 @@ yudao: # ==================================== emqx: enabled: true - http-auth-port: 8090 # MQTT HTTP 认证服务端口 - mqtt-host: 127.0.0.1 # MQTT Broker 地址 - mqtt-port: 1883 # MQTT Broker 端口 - mqtt-username: admin # MQTT 用户名 - mqtt-password: public # MQTT 密码 - mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID - mqtt-ssl: false # 是否开启 SSL + http-port: 8090 # MQTT HTTP 服务端口 + mqtt-host: 127.0.0.1 # MQTT Broker 地址 + mqtt-port: 1883 # MQTT Broker 端口 + mqtt-username: admin # MQTT 用户名 + mqtt-password: public # MQTT 密码 + mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID + mqtt-ssl: false # 是否开启 SSL mqtt-topics: - "/sys/#" # 系统主题 @@ -55,4 +55,4 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG # MQTT 客户端日志 - io.vertx.mqtt: DEBUG \ No newline at end of file +# io.vertx.mqtt: DEBUG \ No newline at end of file