From 50ac2ca5f6fcd3e5685622ca1ef29c13bf44ef9c Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Thu, 14 Aug 2025 19:40:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E3=80=90IoT=20=E7=89=A9=E8=81=94=E7=BD=91?= =?UTF-8?q?=E3=80=91=E4=BC=98=E5=8C=96=20MQTT=20=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 5 +- .../mqtt/IotMqttUpstreamProtocol.java | 19 ++- .../manager/IotMqttConnectionManager.java | 49 ++++++-- .../mqtt/router/IotMqttUpstreamHandler.java | 113 +++++++++--------- 4 files changed, 104 insertions(+), 82 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 257ff96ad0..4b9c3af32c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -129,12 +129,11 @@ public class IotGatewayConfiguration { @Bean public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceService deviceService, IotDeviceMessageService messageService, IotMqttConnectionManager connectionManager, Vertx mqttVertx) { - return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), - deviceService, messageService, connectionManager, mqttVertx); + return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService, + connectionManager, mqttVertx); } @Bean diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java index 0d1203abe9..fc0b6672c1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -4,7 +4,6 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import io.vertx.mqtt.MqttServer; @@ -24,8 +23,6 @@ public class IotMqttUpstreamProtocol { private final IotGatewayProperties.MqttProperties mqttProperties; - private final IotDeviceService deviceService; - private final IotDeviceMessageService messageService; private final IotMqttConnectionManager connectionManager; @@ -38,12 +35,10 @@ public class IotMqttUpstreamProtocol { private MqttServer mqttServer; public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties, - IotDeviceService deviceService, IotDeviceMessageService messageService, IotMqttConnectionManager connectionManager, Vertx vertx) { this.mqttProperties = mqttProperties; - this.deviceService = deviceService; this.messageService = messageService; this.connectionManager = connectionManager; this.vertx = vertx; @@ -54,22 +49,22 @@ public class IotMqttUpstreamProtocol { @PostConstruct public void start() { // 创建服务器选项 - MqttServerOptions options = new MqttServerOptions(); - options.setPort(mqttProperties.getPort()); - options.setMaxMessageSize(mqttProperties.getMaxMessageSize()); - options.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds()); + MqttServerOptions options = new MqttServerOptions() + .setPort(mqttProperties.getPort()) + .setMaxMessageSize(mqttProperties.getMaxMessageSize()) + .setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds()); // 配置 SSL(如果启用) if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) { - options.setSsl(true).setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions()) + options.setSsl(true) + .setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions()) .setTrustOptions(mqttProperties.getSslOptions().getTrustOptions()); } // 创建服务器并设置连接处理器 mqttServer = MqttServer.create(vertx, options); mqttServer.endpointHandler(endpoint -> { - IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, deviceService, - connectionManager); + IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, connectionManager); handler.handle(endpoint); }); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index eed377535a..3fd1a3a041 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager; +import cn.hutool.core.util.StrUtil; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.mqtt.MqttEndpoint; import lombok.Data; @@ -23,6 +24,11 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class IotMqttConnectionManager { + /** + * 未知地址常量(当获取端点地址失败时使用) + */ + private static final String UNKNOWN_ADDRESS = "unknown"; + /** * 连接信息映射:MqttEndpoint -> 连接信息 */ @@ -35,21 +41,32 @@ public class IotMqttConnectionManager { /** * 安全获取 endpoint 地址 + *

+ * 优先从缓存获取地址,缓存为空时再尝试实时获取 * * @param endpoint MQTT 连接端点 - * @return 地址字符串,如果获取失败则返回 "unknown" + * @return 地址字符串,获取失败时返回 "unknown" */ - private String getEndpointAddress(MqttEndpoint endpoint) { - try { - if (endpoint != null) { - return endpoint.remoteAddress().toString(); - } - } catch (Exception e) { - // 忽略异常,返回默认值 - // TODO @haohao:这个比较稳定会出现哇? + public String getEndpointAddress(MqttEndpoint endpoint) { + String realTimeAddress = UNKNOWN_ADDRESS; + if (endpoint == null) { + return realTimeAddress; } - // TODO @haohao:这个要枚举下么? - return "unknown"; + + // 1. 优先从缓存获取(避免连接关闭时的异常) + ConnectionInfo connectionInfo = connectionMap.get(endpoint); + if (connectionInfo != null && StrUtil.isNotBlank(connectionInfo.getRemoteAddress())) { + return connectionInfo.getRemoteAddress(); + } + + // 2. 缓存为空时尝试实时获取 + try { + realTimeAddress = endpoint.remoteAddress().toString(); + } catch (Exception ignored) { + // 连接已关闭,忽略异常 + } + + return realTimeAddress; } /** @@ -87,8 +104,9 @@ public class IotMqttConnectionManager { if (connectionInfo != null) { Long deviceId = connectionInfo.getDeviceId(); deviceEndpointMap.remove(deviceId); - log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", - deviceId, getEndpointAddress(endpoint)); + + log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, + getEndpointAddress(endpoint)); } } @@ -195,6 +213,11 @@ public class IotMqttConnectionManager { */ private boolean authenticated; + /** + * 连接地址 + */ + private String remoteAddress; + } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 9714e9104f..c19053f144 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -12,7 +12,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; @@ -40,8 +39,6 @@ public class IotMqttUpstreamHandler { public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService, - // TODO @haohao:用不到的 deviceService 可以删除哈; - IotDeviceService deviceService, IotMqttConnectionManager connectionManager) { this.deviceMessageService = deviceMessageService; this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); @@ -60,7 +57,7 @@ public class IotMqttUpstreamHandler { String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", - clientId, username, getEndpointAddress(endpoint)); + clientId, username, connectionManager.getEndpointAddress(endpoint)); // 1. 先进行认证 if (!authenticateDevice(clientId, username, password, endpoint)) { @@ -71,32 +68,46 @@ public class IotMqttUpstreamHandler { log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); - // TODO @haohao:这里是不是少了序号哈? - // 设置异常和关闭处理器 + // 2. 设置异常和关闭处理器 endpoint.exceptionHandler(ex -> { - log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint)); + log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, connectionManager.getEndpointAddress(endpoint)); cleanupConnection(endpoint); }); endpoint.closeHandler(v -> { - log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint)); cleanupConnection(endpoint); }); - // 设置消息处理器 + // 3. 设置消息处理器 endpoint.publishHandler(message -> { try { - processMessage(clientId, message.topicName(), message.payload().getBytes(), endpoint); + processMessage(clientId, message.topicName(), message.payload().getBytes()); + + // 根据 QoS 级别发送相应的确认消息 + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + // QoS 1: 发送 PUBACK 确认 + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + // QoS 2: 发送 PUBREC 确认 + endpoint.publishReceived(message.messageId()); + } + // QoS 0 无需确认 + } catch (Exception e) { log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", - clientId, getEndpointAddress(endpoint), e.getMessage()); + clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage()); cleanupConnection(endpoint); endpoint.close(); } }); - // 设置订阅处理器 + // 4. 设置订阅处理器 endpoint.subscribeHandler(subscribe -> { - log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, subscribe.topicSubscriptions()); + // 提取主题名称列表用于日志显示 + List topicNames = subscribe.topicSubscriptions().stream() + .map(MqttTopicSubscription::topicName) + .collect(java.util.stream.Collectors.toList()); + log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames); + // 提取 QoS 列表 List grantedQoSLevels = subscribe.topicSubscriptions().stream() .map(MqttTopicSubscription::qualityOfService) @@ -104,19 +115,22 @@ public class IotMqttUpstreamHandler { endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); }); - // 设置取消订阅处理器 + // 5. 设置取消订阅处理器 endpoint.unsubscribeHandler(unsubscribe -> { log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics()); endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); }); - // 设置断开连接处理器 + // 6. 设置 QoS 2消息的 PUBREL 处理器 + endpoint.publishReleaseHandler(endpoint::publishComplete); + + // 7. 设置断开连接处理器 endpoint.disconnectHandler(v -> { log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId); cleanupConnection(endpoint); }); - // 接受连接 + // 8. 接受连接 endpoint.accept(false); } @@ -126,10 +140,8 @@ public class IotMqttUpstreamHandler { * @param clientId 客户端 ID * @param topic 主题 * @param payload 消息内容 - * @param endpoint MQTT 连接端点 - * @throws Exception 消息解码失败时抛出异常 */ - private void processMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) throws Exception { + private void processMessage(String clientId, String topic, byte[] payload) { // 1. 基础检查 if (payload == null || payload.length == 0) { return; @@ -146,14 +158,22 @@ public class IotMqttUpstreamHandler { String deviceName = topicParts[3]; // 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName) - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); - if (message == null) { - log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); - return; - } + try { + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + if (message == null) { + log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + return; + } - // 4. 处理业务消息(认证已在连接时完成) - handleBusinessRequest(clientId, message, productKey, deviceName, endpoint); + log.info("[processMessage][收到设备消息,设备: {}.{}, 方法: {}]", + productKey, deviceName, message.getMethod()); + + // 4. 处理业务消息(认证已在连接时完成) + handleBusinessRequest(message, productKey, deviceName); + } catch (Exception e) { + log.error("[processMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage(), e); + } } /** @@ -194,9 +214,10 @@ public class IotMqttUpstreamHandler { return false; } - IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO(); - getReqDTO.setProductKey(deviceInfo.getProductKey()); - getReqDTO.setDeviceName(deviceInfo.getDeviceName()); + IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO() + .setProductKey(deviceInfo.getProductKey()) + .setDeviceName(deviceInfo.getDeviceName()); + CommonResult deviceResult = deviceApi.getDevice(getReqDTO); if (!deviceResult.isSuccess() || deviceResult.getData() == null) { log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]", @@ -221,8 +242,7 @@ public class IotMqttUpstreamHandler { /** * 处理业务请求 */ - private void handleBusinessRequest(String clientId, IotDeviceMessage message, String productKey, String deviceName, - MqttEndpoint endpoint) { + private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) { // 发送消息到消息总线 message.setServerId(serverId); deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); @@ -233,12 +253,14 @@ public class IotMqttUpstreamHandler { */ private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { - IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo(); - connectionInfo.setDeviceId(device.getId()); - connectionInfo.setProductKey(device.getProductKey()); - connectionInfo.setDeviceName(device.getDeviceName()); - connectionInfo.setClientId(clientId); - connectionInfo.setAuthenticated(true); + + IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() + .setDeviceId(device.getId()) + .setProductKey(device.getProductKey()) + .setDeviceName(device.getDeviceName()) + .setClientId(clientId) + .setAuthenticated(true) + .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); } @@ -257,23 +279,6 @@ public class IotMqttUpstreamHandler { } } - /** - * 安全获取 endpoint 地址 - * - * @param endpoint MQTT 连接端点 - * @return 地址字符串,如果获取失败则返回 "unknown" - */ - private String getEndpointAddress(MqttEndpoint endpoint) { - try { - if (endpoint != null) { - return endpoint.remoteAddress().toString(); - } - } catch (Exception e) { - // 忽略异常,返回默认值 - } - return "unknown"; - } - /** * 清理连接 */