diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 51af9bd3ce..257ff96ad0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -6,6 +6,10 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; @@ -49,7 +53,7 @@ public class IotGatewayConfiguration { @Configuration @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true") @Slf4j - public static class MqttProtocolConfiguration { + public static class EmqxProtocolConfiguration { @Bean(destroyMethod = "close") public Vertx emqxVertx() { @@ -110,4 +114,42 @@ public class IotGatewayConfiguration { } + /** + * IoT 网关 MQTT 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt", name = "enabled", havingValue = "true") + @Slf4j + public static class MqttProtocolConfiguration { + + @Bean(destroyMethod = "close") + public Vertx mqttVertx() { + return Vertx.vertx(); + } + + @Bean + public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotMqttConnectionManager connectionManager, + Vertx mqttVertx) { + return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), + deviceService, messageService, connectionManager, mqttVertx); + } + + @Bean + public IotMqttDownstreamHandler iotMqttDownstreamHandler(IotDeviceMessageService messageService, + IotMqttConnectionManager connectionManager) { + return new IotMqttDownstreamHandler(messageService, connectionManager); + } + + @Bean + public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, + IotMqttDownstreamHandler downstreamHandler, + IotMessageBus messageBus) { + return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, downstreamHandler, messageBus); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index e4886df07a..7684972d29 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -83,6 +83,11 @@ public class IotGatewayProperties { */ private TcpProperties tcp; + /** + * MQTT 组件配置 + */ + private MqttProperties mqtt; + } @Data @@ -325,4 +330,83 @@ public class IotGatewayProperties { } + @Data + public static class MqttProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务器端口 + */ + private Integer port = 1883; + + /** + * 最大消息大小(字节) + */ + private Integer maxMessageSize = 8192; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 60; + + /** + * 保持连接超时时间(秒) + */ + private Integer keepAliveTimeoutSeconds = 300; + + /** + * 是否启用 SSL + */ + private Boolean sslEnabled = false; + + /** + * SSL 配置 + */ + private SslOptions sslOptions = new SslOptions(); + + /** + * SSL 配置选项 + */ + @Data + public static class SslOptions { + + /** + * 密钥证书选项 + */ + private io.vertx.core.net.KeyCertOptions keyCertOptions; + + /** + * 信任选项 + */ + private io.vertx.core.net.TrustOptions trustOptions; + + /** + * SSL 证书路径 + */ + private String certPath; + + /** + * SSL 私钥路径 + */ + private String keyPath; + + /** + * 信任存储路径 + */ + private String trustStorePath; + + /** + * 信任存储密码 + */ + private String trustStorePassword; + + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java new file mode 100644 index 0000000000..3b62368fd9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java @@ -0,0 +1,79 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议:下行消息订阅器 + *

+ * 负责接收来自消息总线的下行消息,并委托给下行处理器进行业务处理 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttDownstreamSubscriber implements IotMessageSubscriber { + + private final IotMqttUpstreamProtocol upstreamProtocol; + + private final IotMqttDownstreamHandler downstreamHandler; + + private final IotMessageBus messageBus; + + public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol upstreamProtocol, + IotMqttDownstreamHandler downstreamHandler, + IotMessageBus messageBus) { + this.upstreamProtocol = upstreamProtocol; + this.downstreamHandler = downstreamHandler; + this.messageBus = messageBus; + } + + @PostConstruct + public void subscribe() { + messageBus.register(this); + log.info("[subscribe][MQTT 协议下行消息订阅成功,主题:{}]", getTopic()); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId()); + try { + // 1. 校验 + String method = message.getMethod(); + if (method == null) { + log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]", + message.getId(), message.getDeviceId()); + return; + } + + // 2. 委托给下行处理器处理业务逻辑 + boolean success = downstreamHandler.handleDownstreamMessage(message); + if (success) { + log.debug("[onMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId()); + } else { + log.warn("[onMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId()); + } + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId(), e); + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java new file mode 100644 index 0000000000..92ffaedd4b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java @@ -0,0 +1,96 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.MqttServerOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttUpstreamProtocol { + + private final IotGatewayProperties.MqttProperties mqttProperties; + + private final IotDeviceService deviceService; + + private final IotDeviceMessageService messageService; + + private final IotMqttConnectionManager connectionManager; + + private final Vertx vertx; + + @Getter + private final String serverId; + + private MqttServer mqttServer; + + public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotMqttConnectionManager connectionManager, + Vertx vertx) { + this.mqttProperties = mqttProperties; + this.deviceService = deviceService; + this.messageService = messageService; + this.connectionManager = connectionManager; + this.vertx = vertx; + this.serverId = IotDeviceMessageUtils.generateServerId(mqttProperties.getPort()); + } + + @PostConstruct + public void start() { + // 创建服务器选项 + MqttServerOptions options = new MqttServerOptions(); + options.setPort(mqttProperties.getPort()); + options.setMaxMessageSize(mqttProperties.getMaxMessageSize()); + options.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds()); + + // 配置 SSL(如果启用) + if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) { + options.setSsl(true).setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions()) + .setTrustOptions(mqttProperties.getSslOptions().getTrustOptions()); + } + + // 创建服务器并设置连接处理器 + mqttServer = MqttServer.create(vertx, options); + mqttServer.endpointHandler(endpoint -> { + IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, deviceService, + connectionManager); + handler.handle(endpoint); + }); + + // 启动服务器 + try { + mqttServer.listen().result(); + log.info("[start][IoT 网关 MQTT 协议启动成功,端口:{}]", mqttProperties.getPort()); + } catch (Exception e) { + log.error("[start][IoT 网关 MQTT 协议启动失败]", e); + throw e; + } + } + + @PreDestroy + public void stop() { + if (mqttServer != null) { + try { + mqttServer.close().result(); + log.info("[stop][IoT 网关 MQTT 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 MQTT 协议停止失败]", e); + } + } + } +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java new file mode 100644 index 0000000000..11432bc248 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -0,0 +1,197 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager; + +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.mqtt.MqttEndpoint; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * IoT 网关 MQTT 连接管理器 + *

+ * 统一管理 MQTT 连接的认证状态、设备会话和消息发送功能: + * 1. 管理 MQTT 连接的认证状态 + * 2. 管理设备会话和在线状态 + * 3. 管理消息发送到设备 + * + * @author 芋道源码 + */ +@Slf4j +@Component +public class IotMqttConnectionManager { + + /** + * 连接信息映射:MqttEndpoint -> 连接信息 + */ + private final Map connectionMap = new ConcurrentHashMap<>(); + + /** + * 设备 ID -> MqttEndpoint 的映射 + */ + private final Map deviceEndpointMap = new ConcurrentHashMap<>(); + + /** + * 安全获取 endpoint 地址 + * + * @param endpoint MQTT 连接端点 + * @return 地址字符串,如果获取失败则返回 "unknown" + */ + private String getEndpointAddress(MqttEndpoint endpoint) { + try { + if (endpoint != null) { + return endpoint.remoteAddress().toString(); + } + } catch (Exception e) { + // 忽略异常,返回默认值 + } + return "unknown"; + } + + /** + * 注册设备连接(包含认证信息) + * + * @param endpoint MQTT 连接端点 + * @param deviceId 设备 ID + * @param connectionInfo 连接信息 + */ + public void registerConnection(MqttEndpoint endpoint, Long deviceId, ConnectionInfo connectionInfo) { + // 如果设备已有其他连接,先清理旧连接 + MqttEndpoint oldEndpoint = deviceEndpointMap.get(deviceId); + if (oldEndpoint != null && oldEndpoint != endpoint) { + log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]", + deviceId, getEndpointAddress(oldEndpoint)); + oldEndpoint.close(); + // 清理旧连接的映射 + connectionMap.remove(oldEndpoint); + } + + connectionMap.put(endpoint, connectionInfo); + deviceEndpointMap.put(deviceId, endpoint); + + log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]", + deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName()); + } + + /** + * 注销设备连接 + * + * @param endpoint MQTT 连接端点 + */ + public void unregisterConnection(MqttEndpoint endpoint) { + ConnectionInfo connectionInfo = connectionMap.remove(endpoint); + if (connectionInfo != null) { + Long deviceId = connectionInfo.getDeviceId(); + deviceEndpointMap.remove(deviceId); + log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", + deviceId, getEndpointAddress(endpoint)); + } + } + + /** + * 获取连接信息 + */ + public ConnectionInfo getConnectionInfo(MqttEndpoint endpoint) { + return connectionMap.get(endpoint); + } + + /** + * 根据设备 ID 获取连接信息 + * + * @param deviceId 设备 ID + * @return 连接信息 + */ + public IotMqttConnectionManager.ConnectionInfo getConnectionInfoByDeviceId(Long deviceId) { + // 通过设备 ID 获取连接端点 + var endpoint = getDeviceEndpoint(deviceId); + if (endpoint == null) { + return null; + } + + // 获取连接信息 + return getConnectionInfo(endpoint); + } + + /** + * 检查设备是否在线 + */ + public boolean isDeviceOnline(Long deviceId) { + return deviceEndpointMap.containsKey(deviceId); + } + + /** + * 检查设备是否离线 + */ + public boolean isDeviceOffline(Long deviceId) { + return !isDeviceOnline(deviceId); + } + + /** + * 发送消息到设备 + * + * @param deviceId 设备 ID + * @param topic 主题 + * @param payload 消息内容 + * @param qos 服务质量 + * @param retain 是否保留消息 + * @return 是否发送成功 + */ + public boolean sendToDevice(Long deviceId, String topic, byte[] payload, int qos, boolean retain) { + MqttEndpoint endpoint = deviceEndpointMap.get(deviceId); + if (endpoint == null) { + log.warn("[sendToDevice][设备离线,无法发送消息,设备 ID: {},主题: {}]", deviceId, topic); + return false; + } + + try { + endpoint.publish(topic, io.vertx.core.buffer.Buffer.buffer(payload), MqttQoS.valueOf(qos), false, retain); + log.debug("[sendToDevice][发送消息成功,设备 ID: {},主题: {},QoS: {}]", deviceId, topic, qos); + return true; + } catch (Exception e) { + log.error("[sendToDevice][发送消息失败,设备 ID: {},主题: {},错误: {}]", deviceId, topic, e.getMessage()); + return false; + } + } + + /** + * 获取设备连接端点 + */ + public MqttEndpoint getDeviceEndpoint(Long deviceId) { + return deviceEndpointMap.get(deviceId); + } + + /** + * 连接信息 + */ + @Data + public static class ConnectionInfo { + + /** + * 设备 ID + */ + private Long deviceId; + + /** + * 产品 Key + */ + private String productKey; + + /** + * 设备名称 + */ + private String deviceName; + + /** + * 客户端 ID + */ + private String clientId; + + /** + * 是否已认证 + */ + private boolean authenticated; + + } +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java new file mode 100644 index 0000000000..fabe79466e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java @@ -0,0 +1,6 @@ +/** + * MQTT 协议实现包 + *

+ * 提供基于 Vert.x MQTT Server 的 IoT 设备连接和消息处理功能 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java new file mode 100644 index 0000000000..6714c64115 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java @@ -0,0 +1,133 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议:下行消息处理器 + *

+ * 专门处理下行消息的业务逻辑,包括: + * 1. 消息编码 + * 2. 主题构建 + * 3. 消息发送 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttDownstreamHandler { + + private final IotDeviceMessageService deviceMessageService; + + private final IotMqttConnectionManager connectionManager; + + public IotMqttDownstreamHandler(IotDeviceMessageService deviceMessageService, + IotMqttConnectionManager connectionManager) { + this.deviceMessageService = deviceMessageService; + this.connectionManager = connectionManager; + } + + /** + * 处理下行消息 + * + * @param message 设备消息 + * @return 是否处理成功 + */ + public boolean handleDownstreamMessage(IotDeviceMessage message) { + try { + // 1. 基础校验 + if (message == null || message.getDeviceId() == null) { + log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]"); + return false; + } + + // 2. 检查设备是否在线 + if (connectionManager.isDeviceOffline(message.getDeviceId())) { + log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID:{}]", message.getDeviceId()); + return false; + } + + // 3. 获取连接信息 + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(message.getDeviceId()); + if (connectionInfo == null) { + log.warn("[handleDownstreamMessage][连接信息不存在,设备 ID:{}]", message.getDeviceId()); + return false; + } + + // 4. 编码消息 + byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(), + connectionInfo.getDeviceName()); + if (payload == null || payload.length == 0) { + log.warn("[handleDownstreamMessage][消息编码失败,设备 ID:{}]", message.getDeviceId()); + return false; + } + + // 5. 发送消息到设备 + return sendMessageToDevice(message, connectionInfo, payload); + + } catch (Exception e) { + if (message != null) { + log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID:{},错误:{}]", + message.getDeviceId(), e.getMessage(), e); + } + return false; + } + } + + /** + * 发送消息到设备 + * + * @param message 设备消息 + * @param connectionInfo 连接信息 + * @param payload 消息负载 + * @return 是否发送成功 + */ + private boolean sendMessageToDevice(IotDeviceMessage message, + IotMqttConnectionManager.ConnectionInfo connectionInfo, + byte[] payload) { + // 1. 构建主题 + String topic = buildDownstreamTopic(message, connectionInfo); + if (StrUtil.isBlank(topic)) { + log.warn("[sendMessageToDevice][主题构建失败,设备 ID:{},方法:{}]", + message.getDeviceId(), message.getMethod()); + return false; + } + + // 2. 发送消息 + boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload, MqttQoS.AT_LEAST_ONCE.value(), false); + if (success) { + log.debug("[sendMessageToDevice][消息发送成功,设备 ID:{},主题:{},方法:{}]", + message.getDeviceId(), topic, message.getMethod()); + } else { + log.warn("[sendMessageToDevice][消息发送失败,设备 ID:{},主题:{},方法:{}]", + message.getDeviceId(), topic, message.getMethod()); + } + return success; + } + + /** + * 构建下行消息主题 + * + * @param message 设备消息 + * @param connectionInfo 连接信息 + * @return 主题 + */ + private String buildDownstreamTopic(IotDeviceMessage message, + IotMqttConnectionManager.ConnectionInfo connectionInfo) { + String method = message.getMethod(); + if (StrUtil.isBlank(method)) { + return null; + } + + // 使用工具类构建主题,支持回复消息处理 + boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); + return IotMqttTopicUtils.buildTopicByMethod(method, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), isReply); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java new file mode 100644 index 0000000000..5ce691293c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -0,0 +1,298 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.mqtt.MqttEndpoint; +import io.vertx.mqtt.MqttTopicSubscription; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * MQTT 上行消息处理器 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttUpstreamHandler { + + private final IotDeviceMessageService deviceMessageService; + + private final IotMqttConnectionManager connectionManager; + + private final IotDeviceCommonApi deviceApi; + + private final String serverId; + + public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol, + IotDeviceMessageService deviceMessageService, + IotDeviceService deviceService, + IotMqttConnectionManager connectionManager) { + this.deviceMessageService = deviceMessageService; + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.connectionManager = connectionManager; + this.serverId = protocol.getServerId(); + } + + /** + * 处理 MQTT 连接 + * + * @param endpoint MQTT 连接端点 + */ + public void handle(MqttEndpoint endpoint) { + String clientId = endpoint.clientIdentifier(); + String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null; + String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; + + log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", + clientId, username, getEndpointAddress(endpoint)); + + // 1. 先进行认证 + if (!authenticateDevice(clientId, username, password, endpoint)) { + log.warn("[handle][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); + return; + } + + log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); + + // 设置异常和关闭处理器 + endpoint.exceptionHandler(ex -> { + log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint)); + cleanupConnection(endpoint); + }); + endpoint.closeHandler(v -> { + log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint)); + cleanupConnection(endpoint); + }); + + // 设置消息处理器 + endpoint.publishHandler(message -> { + try { + processMessage(clientId, message.topicName(), message.payload().getBytes(), endpoint); + } catch (Exception e) { + log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, getEndpointAddress(endpoint), e.getMessage()); + cleanupConnection(endpoint); + endpoint.close(); + } + }); + + // 设置订阅处理器 + endpoint.subscribeHandler(subscribe -> { + log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, subscribe.topicSubscriptions()); + // 提取 QoS 列表 + List grantedQoSLevels = subscribe.topicSubscriptions().stream() + .map(MqttTopicSubscription::qualityOfService) + .collect(java.util.stream.Collectors.toList()); + endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); + }); + + // 设置取消订阅处理器 + endpoint.unsubscribeHandler(unsubscribe -> { + log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics()); + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }); + + // 设置断开连接处理器 + endpoint.disconnectHandler(v -> { + log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId); + cleanupConnection(endpoint); + }); + + // 接受连接 + endpoint.accept(false); + } + + /** + * 处理消息 + * + * @param clientId 客户端 ID + * @param topic 主题 + * @param payload 消息内容 + * @param endpoint MQTT 连接端点 + * @throws Exception 消息解码失败时抛出异常 + */ + private void processMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) throws Exception { + // 1. 基础检查 + if (payload == null || payload.length == 0) { + return; + } + + // 2. 解析主题,获取 productKey 和 deviceName + String[] topicParts = topic.split("/"); + if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + log.warn("[processMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); + return; + } + + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + + // 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName) + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + if (message == null) { + log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + return; + } + + // 4. 处理业务消息(认证已在连接时完成) + handleBusinessRequest(clientId, message, productKey, deviceName, endpoint); + } + + /** + * 在 MQTT 连接时进行设备认证 + * + * @param clientId 客户端 ID + * @param username 用户名 + * @param password 密码 + * @param endpoint MQTT 连接端点 + * @return 认证是否成功 + */ + private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) { + try { + // 1. 参数校验 + if (StrUtil.hasEmpty(clientId, username, password)) { + log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + + // 2. 构建认证参数 + IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() + .setClientId(clientId) + .setUsername(username) + .setPassword(password); + + // 3. 调用设备认证 API + CommonResult authResult = deviceApi.authDevice(authParams); + if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { + log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]", + clientId, username, authResult.getMsg()); + return false; + } + + // 4. 获取设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + + IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO(); + getReqDTO.setProductKey(deviceInfo.getProductKey()); + getReqDTO.setDeviceName(deviceInfo.getDeviceName()); + CommonResult deviceResult = deviceApi.getDevice(getReqDTO); + if (!deviceResult.isSuccess() || deviceResult.getData() == null) { + log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]", + clientId, username, deviceResult.getMsg()); + return false; + } + + // 5. 注册连接 + IotDeviceRespDTO device = deviceResult.getData(); + registerConnection(endpoint, device, clientId); + + // 6. 发送设备上线消息 + sendOnlineMessage(device); + + return true; + } catch (Exception e) { + log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e); + return false; + } + } + + /** + * 处理业务请求 + */ + private void handleBusinessRequest(String clientId, IotDeviceMessage message, String productKey, String deviceName, + MqttEndpoint endpoint) { + // 发送消息到消息总线 + message.setServerId(serverId); + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + } + + /** + * 注册连接 + */ + private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, + String clientId) { + IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo(); + connectionInfo.setDeviceId(device.getId()); + connectionInfo.setProductKey(device.getProductKey()); + connectionInfo.setDeviceName(device.getDeviceName()); + connectionInfo.setClientId(clientId); + connectionInfo.setAuthenticated(true); + + connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); + } + + /** + * 发送设备上线消息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName()); + } catch (Exception e) { + log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage()); + } + } + + /** + * 安全获取 endpoint 地址 + * + * @param endpoint MQTT 连接端点 + * @return 地址字符串,如果获取失败则返回 "unknown" + */ + private String getEndpointAddress(MqttEndpoint endpoint) { + try { + if (endpoint != null) { + return endpoint.remoteAddress().toString(); + } + } catch (Exception e) { + // 忽略异常,返回默认值 + } + return "unknown"; + } + + /** + * 清理连接 + */ + private void cleanupConnection(MqttEndpoint endpoint) { + try { + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); + if (connectionInfo != null) { + // 发送设备离线消息 + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), serverId); + log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", + connectionInfo.getDeviceId(), connectionInfo.getDeviceName()); + } + + // 注销连接 + connectionManager.unregisterConnection(endpoint); + } catch (Exception e) { + log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", + endpoint.clientIdentifier(), e.getMessage()); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index b306f0588c..5f5cfbd559 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -48,13 +48,13 @@ yudao: # 针对引入的 HTTP 组件的配置 # ==================================== http: - enabled: true + enabled: false server-port: 8092 # ==================================== # 针对引入的 EMQX 组件的配置 # ==================================== emqx: - enabled: true + enabled: false http-port: 8090 # MQTT HTTP 服务端口 mqtt-host: 127.0.0.1 # MQTT Broker 地址 mqtt-port: 1883 # MQTT Broker 端口 @@ -95,6 +95,16 @@ yudao: ssl-enabled: false ssl-cert-path: "classpath:certs/client.jks" ssl-key-path: "classpath:certs/client.jks" + # ==================================== + # 针对引入的 MQTT 组件的配置 + # ==================================== + mqtt: + enabled: true + port: 1883 + max-message-size: 8192 + connect-timeout-seconds: 60 + keep-alive-timeout-seconds: 300 + ssl-enabled: false --- #################### 日志相关配置 #################### @@ -113,6 +123,7 @@ logging: # 开发环境详细日志 cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG # 根日志级别 root: INFO