From bb1210a17a3a9b584fefbfee0b52a8ce56b9962c Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Tue, 15 Jul 2025 20:53:09 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E9=87=8D=E6=9E=84=20TCP=20=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../codec/tcp/IotTcpDeviceMessageCodec.java | 390 ++++++++++++++ .../config/IotGatewayConfiguration.java | 34 +- .../gateway/config/IotGatewayProperties.java | 57 +- .../protocol/tcp/IotTcpConnectionManager.java | 64 --- .../tcp/IotTcpDownstreamSubscriber.java | 225 +++++--- .../protocol/tcp/IotTcpUpstreamProtocol.java | 247 ++++++--- .../protocol/tcp/client/TcpDeviceClient.java | 218 ++++++++ .../manager/TcpDeviceConnectionManager.java | 503 ++++++++++++++++++ .../gateway/protocol/tcp/package-info.java | 1 - .../protocol/tcp/protocol/TcpDataDecoder.java | 97 ++++ .../protocol/tcp/protocol/TcpDataEncoder.java | 172 ++++++ .../protocol/tcp/protocol/TcpDataPackage.java | 153 ++++++ .../protocol/tcp/protocol/TcpDataReader.java | 159 ++++++ .../tcp/router/IotTcpConnectionHandler.java | 148 ------ .../tcp/router/IotTcpDownstreamHandler.java | 413 ++++++++++++-- .../tcp/router/IotTcpUpstreamHandler.java | 393 ++++++++++++++ .../message/IotDeviceMessageServiceImpl.java | 12 +- .../src/main/resources/application.yaml | 9 +- 18 files changed, 2861 insertions(+), 434 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/client/TcpDeviceClient.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java new file mode 100644 index 0000000000..0bcef2e0cb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java @@ -0,0 +1,390 @@ +package cn.iocoder.yudao.module.iot.gateway.codec.tcp; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONException; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataEncoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataPackage; +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * TCP {@link IotDeviceMessage} 编解码器 + *

+ * 参考 EMQX 设计理念: + * 1. 高性能编解码 + * 2. 容错机制 + * 3. 缓存优化 + * 4. 监控统计 + * 5. 资源管理 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { + + /** + * 编解码器类型 + */ + public static final String TYPE = "tcp"; + + // ==================== 方法映射 ==================== + + /** + * 消息方法到功能码的映射 + */ + private static final Map METHOD_TO_CODE_MAP = new ConcurrentHashMap<>(); + + /** + * 功能码到消息方法的映射 + */ + private static final Map CODE_TO_METHOD_MAP = new ConcurrentHashMap<>(); + + static { + // 初始化方法映射 + initializeMethodMappings(); + } + + // ==================== 缓存管理 ==================== + + /** + * JSON 缓存,提升编解码性能 + */ + private final Map jsonCache = new ConcurrentHashMap<>(); + + /** + * 缓存最大大小 + */ + private static final int MAX_CACHE_SIZE = 1000; + + // ==================== 常量定义 ==================== + + /** + * 负载字段名 + */ + public static class PayloadField { + public static final String TIMESTAMP = "timestamp"; + public static final String MESSAGE_ID = "msgId"; + public static final String DEVICE_ID = "deviceId"; + public static final String PARAMS = "params"; + public static final String DATA = "data"; + public static final String CODE = "code"; + public static final String MESSAGE = "message"; + } + + /** + * 消息方法映射 + */ + public static class MessageMethod { + public static final String PROPERTY_POST = "thing.property.post"; + public static final String PROPERTY_SET = "thing.property.set"; + public static final String PROPERTY_GET = "thing.property.get"; + public static final String EVENT_POST = "thing.event.post"; + public static final String SERVICE_INVOKE = "thing.service.invoke"; + public static final String CONFIG_PUSH = "thing.config.push"; + public static final String OTA_UPGRADE = "thing.ota.upgrade"; + public static final String STATE_ONLINE = "thing.state.online"; + public static final String STATE_OFFLINE = "thing.state.offline"; + } + + // ==================== 初始化方法 ==================== + + /** + * 初始化方法映射 + */ + private static void initializeMethodMappings() { + METHOD_TO_CODE_MAP.put(MessageMethod.PROPERTY_POST, TcpDataPackage.CODE_DATA_UP); + METHOD_TO_CODE_MAP.put(MessageMethod.PROPERTY_SET, TcpDataPackage.CODE_PROPERTY_SET); + METHOD_TO_CODE_MAP.put(MessageMethod.PROPERTY_GET, TcpDataPackage.CODE_PROPERTY_GET); + METHOD_TO_CODE_MAP.put(MessageMethod.EVENT_POST, TcpDataPackage.CODE_EVENT_UP); + METHOD_TO_CODE_MAP.put(MessageMethod.SERVICE_INVOKE, TcpDataPackage.CODE_SERVICE_INVOKE); + METHOD_TO_CODE_MAP.put(MessageMethod.CONFIG_PUSH, TcpDataPackage.CODE_DATA_DOWN); + METHOD_TO_CODE_MAP.put(MessageMethod.OTA_UPGRADE, TcpDataPackage.CODE_DATA_DOWN); + METHOD_TO_CODE_MAP.put(MessageMethod.STATE_ONLINE, TcpDataPackage.CODE_HEARTBEAT); + METHOD_TO_CODE_MAP.put(MessageMethod.STATE_OFFLINE, TcpDataPackage.CODE_HEARTBEAT); + + // 反向映射 + METHOD_TO_CODE_MAP.forEach((method, code) -> CODE_TO_METHOD_MAP.put(code, method)); + } + + // ==================== 编解码方法 ==================== + + @Override + public byte[] encode(IotDeviceMessage message) { + validateEncodeParams(message); + + try { + if (log.isDebugEnabled()) { + log.debug("[encode][开始编码 TCP 消息] 方法: {}, 消息ID: {}", + message.getMethod(), message.getRequestId()); + } + + // 1. 获取功能码 + short code = getCodeByMethodSafely(message.getMethod()); + + // 2. 构建负载 + String payload = buildPayloadOptimized(message); + + // 3. 构建 TCP 数据包 + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr("") // 地址在发送时由调用方设置 + .code(code) + .mid((short) 0) // 消息序号在发送时由调用方设置 + .payload(payload) + .build(); + + // 4. 编码为字节流 + Buffer buffer = TcpDataEncoder.encode(dataPackage); + byte[] result = buffer.getBytes(); + + // 5. 统计信息 + if (log.isDebugEnabled()) { + log.debug("[encode][TCP 消息编码成功] 方法: {}, 数据长度: {}", + message.getMethod(), result.length); + } + + return result; + + } catch (Exception e) { + log.error("[encode][TCP 消息编码失败] 消息: {}", message, e); + throw new TcpCodecException("TCP 消息编码失败", e); + } + } + + @Override + public IotDeviceMessage decode(byte[] bytes) { + validateDecodeParams(bytes); + + try { + if (log.isDebugEnabled()) { + log.debug("[decode][开始解码 TCP 消息] 数据长度: {}", bytes.length); + } + + // 1. 解码 TCP 数据包 + Buffer buffer = Buffer.buffer(bytes); + TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); + + // 2. 获取消息方法 + String method = getMethodByCodeSafely(dataPackage.getCode()); + + // 3. 解析负载数据 + Object params = parsePayloadOptimized(dataPackage.getPayload()); + + // 4. 构建 IoT 设备消息 + IotDeviceMessage message = IotDeviceMessage.requestOf(method, params); + + // 5. 统计信息 + if (log.isDebugEnabled()) { + log.debug("[decode][TCP 消息解码成功] 方法: {}, 功能码: {}", + method, dataPackage.getCode()); + } + + return message; + + } catch (Exception e) { + log.error("[decode][TCP 消息解码失败] 数据长度: {}, 数据内容: {}", + bytes.length, truncateData(bytes, 100), e); + throw new TcpCodecException("TCP 消息解码失败", e); + } + } + + @Override + public String type() { + return TYPE; + } + + // ==================== 内部辅助方法 ==================== + + /** + * 验证编码参数 + */ + private void validateEncodeParams(IotDeviceMessage message) { + if (Objects.isNull(message)) { + throw new IllegalArgumentException("IoT 设备消息不能为空"); + } + if (StrUtil.isEmpty(message.getMethod())) { + throw new IllegalArgumentException("消息方法不能为空"); + } + } + + /** + * 验证解码参数 + */ + private void validateDecodeParams(byte[] bytes) { + if (Objects.isNull(bytes) || bytes.length == 0) { + throw new IllegalArgumentException("待解码数据不能为空"); + } + if (bytes.length > 1024 * 1024) { // 1MB 限制 + throw new IllegalArgumentException("数据包过大,超过1MB限制"); + } + } + + /** + * 安全获取功能码 + */ + private short getCodeByMethodSafely(String method) { + Short code = METHOD_TO_CODE_MAP.get(method); + if (code == null) { + log.warn("[getCodeByMethodSafely][未知的消息方法: {},使用默认功能码]", method); + return TcpDataPackage.CODE_DATA_UP; // 默认为数据上报 + } + return code; + } + + /** + * 安全获取消息方法 + */ + private String getMethodByCodeSafely(short code) { + String method = CODE_TO_METHOD_MAP.get(code); + if (method == null) { + log.warn("[getMethodByCodeSafely][未知的功能码: {},使用默认方法]", code); + return MessageMethod.PROPERTY_POST; // 默认为属性上报 + } + return method; + } + + /** + * 优化的负载构建 + */ + private String buildPayloadOptimized(IotDeviceMessage message) { + // 使用缓存键 + String cacheKey = message.getMethod() + "_" + message.getRequestId(); + JSONObject cachedPayload = jsonCache.get(cacheKey); + + if (cachedPayload != null) { + // 更新时间戳 + cachedPayload.set(PayloadField.TIMESTAMP, System.currentTimeMillis()); + return cachedPayload.toString(); + } + + // 创建新的负载 + JSONObject payload = new JSONObject(); + + // 添加基础字段 + addToPayloadIfNotNull(payload, PayloadField.MESSAGE_ID, message.getRequestId()); + addToPayloadIfNotNull(payload, PayloadField.DEVICE_ID, message.getDeviceId()); + addToPayloadIfNotNull(payload, PayloadField.PARAMS, message.getParams()); + addToPayloadIfNotNull(payload, PayloadField.DATA, message.getData()); + addToPayloadIfNotNull(payload, PayloadField.CODE, message.getCode()); + addToPayloadIfNotEmpty(payload, PayloadField.MESSAGE, message.getMsg()); + + // 添加时间戳 + payload.set(PayloadField.TIMESTAMP, System.currentTimeMillis()); + + // 缓存管理 + if (jsonCache.size() < MAX_CACHE_SIZE) { + jsonCache.put(cacheKey, payload); + } else { + cleanJsonCacheIfNeeded(); + } + + return payload.toString(); + } + + /** + * 优化的负载解析 + */ + private Object parsePayloadOptimized(String payload) { + if (StrUtil.isEmpty(payload)) { + return null; + } + + try { + // 尝试从缓存获取 + JSONObject cachedJson = jsonCache.get(payload); + if (cachedJson != null) { + return cachedJson.containsKey(PayloadField.PARAMS) ? cachedJson.get(PayloadField.PARAMS) : cachedJson; + } + + // 解析 JSON 对象 + JSONObject jsonObject = JSONUtil.parseObj(payload); + + // 缓存解析结果 + if (jsonCache.size() < MAX_CACHE_SIZE) { + jsonCache.put(payload, jsonObject); + } + + return jsonObject.containsKey(PayloadField.PARAMS) ? jsonObject.get(PayloadField.PARAMS) : jsonObject; + + } catch (JSONException e) { + log.warn("[parsePayloadOptimized][负载解析为JSON失败,返回原始字符串] 负载: {}", payload); + return payload; + } catch (Exception e) { + log.error("[parsePayloadOptimized][负载解析异常] 负载: {}", payload, e); + return payload; + } + } + + /** + * 添加非空值到负载 + */ + private void addToPayloadIfNotNull(JSONObject json, String key, Object value) { + if (ObjectUtil.isNotNull(value)) { + json.set(key, value); + } + } + + /** + * 添加非空字符串到负载 + */ + private void addToPayloadIfNotEmpty(JSONObject json, String key, String value) { + if (StrUtil.isNotEmpty(value)) { + json.set(key, value); + } + } + + /** + * 清理JSON缓存 + */ + private void cleanJsonCacheIfNeeded() { + if (jsonCache.size() > MAX_CACHE_SIZE) { + // 清理一半的缓存 + int clearCount = jsonCache.size() / 2; + jsonCache.entrySet().removeIf(entry -> clearCount > 0 && Math.random() < 0.5); + + if (log.isDebugEnabled()) { + log.debug("[cleanJsonCacheIfNeeded][JSON 缓存已清理] 当前缓存大小: {}", jsonCache.size()); + } + } + } + + /** + * 截断数据用于日志输出 + */ + private String truncateData(byte[] data, int maxLength) { + if (data.length <= maxLength) { + return new String(data, StandardCharsets.UTF_8); + } + + byte[] truncated = new byte[maxLength]; + System.arraycopy(data, 0, truncated, 0, maxLength); + return new String(truncated, StandardCharsets.UTF_8) + "...(截断)"; + } + + // ==================== 自定义异常 ==================== + + /** + * TCP 编解码异常 + */ + public static class TcpCodecException extends RuntimeException { + public TcpCodecException(String message) { + super(message); + } + + public TcpCodecException(String message, Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 3481faead8..de5f3426be 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -7,10 +7,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.TcpDeviceConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; @@ -85,26 +84,33 @@ public class IotGatewayConfiguration { @Slf4j public static class TcpProtocolConfiguration { - // TODO @haohao:close - @Bean + @Bean(destroyMethod = "close") public Vertx tcpVertx() { return Vertx.vertx(); } @Bean - public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(Vertx tcpVertx, IotGatewayProperties gatewayProperties, - IotTcpConnectionManager connectionManager, - IotDeviceMessageService messageService, - IotDeviceService deviceService, IotDeviceCommonApi deviceApi) { - return new IotTcpUpstreamProtocol(tcpVertx, gatewayProperties, connectionManager, - messageService, deviceService, deviceApi); + public TcpDeviceConnectionManager tcpDeviceConnectionManager() { + return new TcpDeviceConnectionManager(); } @Bean - public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol tcpUpstreamProtocol, - IotMessageBus messageBus, - IotTcpDownstreamHandler downstreamHandler) { - return new IotTcpDownstreamSubscriber(tcpUpstreamProtocol, messageBus, downstreamHandler); + public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties, + TcpDeviceConnectionManager connectionManager, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotDeviceCommonApi deviceApi, + Vertx tcpVertx) { + return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(), connectionManager, + deviceService, messageService, deviceApi, tcpVertx); + } + + @Bean + public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler, + TcpDeviceConnectionManager connectionManager, + IotDeviceMessageService messageService, + IotMessageBus messageBus) { + return new IotTcpDownstreamSubscriber(protocolHandler, connectionManager, messageService, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 737a1560dc..e4886df07a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -115,22 +115,6 @@ public class IotGatewayProperties { } - @Data - public static class TcpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务端口(默认:8093) - */ - private Integer serverPort = 8093; - - } - @Data public static class EmqxProperties { @@ -300,4 +284,45 @@ public class IotGatewayProperties { } + @Data + public static class TcpProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务器端口 + */ + private Integer port = 8091; + + /** + * 心跳超时时间(毫秒) + */ + private Long keepAliveTimeoutMs = 30000L; + + /** + * 最大连接数 + */ + private Integer maxConnections = 1000; + + /** + * 是否启用SSL + */ + private Boolean sslEnabled = false; + + /** + * SSL证书路径 + */ + private String sslCertPath; + + /** + * SSL私钥路径 + */ + private String sslKeyPath; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java deleted file mode 100644 index a208e74e5d..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java +++ /dev/null @@ -1,64 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; - -import io.vertx.core.net.NetSocket; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * IoT TCP 连接管理器 - * - * @author 芋道源码 - */ -@Component -@Slf4j -public class IotTcpConnectionManager { - - // TODO @haohao:要考虑,相同设备,多次连接的情况哇? - /** - * 连接集合 - * - * key:设备唯一标识 - */ - private final ConcurrentMap connectionMap = new ConcurrentHashMap<>(); - - /** - * 添加一个新连接 - * - * @param deviceId 设备唯一标识 - * @param socket Netty Channel - */ - public void addConnection(String deviceId, NetSocket socket) { - log.info("[addConnection][设备({}) 连接({})]", deviceId, socket.remoteAddress()); - connectionMap.put(deviceId, socket); - } - - /** - * 根据设备 ID 获取连接 - * - * @param deviceId 设备 ID - * @return 连接 - */ - public NetSocket getConnection(String deviceId) { - return connectionMap.get(deviceId); - } - - /** - * 移除指定连接 - * - * @param socket Netty Channel - */ - public void removeConnection(NetSocket socket) { - // TODO @haohao:vertx 的 socket,有没办法设置一些属性,类似 netty 的;目的是,避免遍历 connectionMap 去操作哈; - connectionMap.entrySet().stream() - .filter(entry -> entry.getValue().equals(socket)) - .findFirst() - .ifPresent(entry -> { - log.info("[removeConnection][设备({}) 断开连接({})]", entry.getKey(), socket.remoteAddress()); - connectionMap.remove(entry.getKey()); - }); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java index f324d45438..d5c916295c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java @@ -1,64 +1,163 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; - -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; -import jakarta.annotation.PostConstruct; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 TCP 订阅者:接收下行给设备的消息 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotTcpDownstreamSubscriber implements IotMessageSubscriber { - - private final IotTcpUpstreamProtocol protocol; - - private final IotMessageBus messageBus; - - private final IotTcpDownstreamHandler downstreamHandler; - - @PostConstruct - public void init() { - messageBus.register(this); - } - - @Override - public String getTopic() { - return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); - } - - @Override - public String getGroup() { - // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group - return getTopic(); - } - - @Override - public void onMessage(IotDeviceMessage message) { - log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]", - message.getId(), message.getMethod(), message.getDeviceId()); - try { - // 1. 校验 - String method = message.getMethod(); - if (method == null) { - log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]", - message.getId(), message.getDeviceId()); - return; - } - - // 2. 处理下行消息 - downstreamHandler.handle(message); - } catch (Exception e) { - log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]", - message.getId(), message.getMethod(), message.getDeviceId(), e); - } - } - +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.TcpDeviceConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * IoT 网关 TCP 下游订阅者:接收下行给设备的消息 + *

+ * 参考 EMQX 设计理念: + * 1. 高性能消息路由 + * 2. 容错机制 + * 3. 状态监控 + * 4. 资源管理 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotTcpDownstreamSubscriber implements IotMessageSubscriber { + + private final IotTcpUpstreamProtocol protocolHandler; + + private final TcpDeviceConnectionManager connectionManager; + + private final IotDeviceMessageService messageService; + + private final IotMessageBus messageBus; + + private volatile IotTcpDownstreamHandler downstreamHandler; + + private final AtomicBoolean initialized = new AtomicBoolean(false); + + private final AtomicLong processedMessages = new AtomicLong(0); + + private final AtomicLong failedMessages = new AtomicLong(0); + + @PostConstruct + public void init() { + if (!initialized.compareAndSet(false, true)) { + log.warn("[init][TCP 下游消息订阅者已初始化,跳过重复初始化]"); + return; + } + + try { + // 初始化下游处理器 + downstreamHandler = new IotTcpDownstreamHandler(connectionManager, messageService); + + // 注册到消息总线 + messageBus.register(this); + + log.info("[init][TCP 下游消息订阅者初始化完成] Topic: {}, Group: {}", + getTopic(), getGroup()); + } catch (Exception e) { + initialized.set(false); + log.error("[init][TCP 下游消息订阅者初始化失败]", e); + throw new RuntimeException("TCP 下游消息订阅者初始化失败", e); + } + } + + @PreDestroy + public void destroy() { + if (!initialized.get()) { + return; + } + + try { + log.info("[destroy][TCP 下游消息订阅者已关闭] 处理消息数: {}, 失败消息数: {}", + processedMessages.get(), failedMessages.get()); + } catch (Exception e) { + log.error("[destroy][TCP 下游消息订阅者关闭失败]", e); + } finally { + initialized.set(false); + } + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocolHandler.getServerId()); + } + + @Override + public String getGroup() { + return "tcp-downstream-" + protocolHandler.getServerId(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + if (!initialized.get()) { + log.warn("[onMessage][订阅者未初始化,跳过消息处理]"); + return; + } + + long startTime = System.currentTimeMillis(); + + try { + processedMessages.incrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("[onMessage][收到下行消息] 设备ID: {}, 方法: {}, 消息ID: {}", + message.getDeviceId(), message.getMethod(), message.getId()); + } + + // 参数校验 + if (message.getDeviceId() == null) { + log.warn("[onMessage][下行消息设备ID为空,跳过处理] 消息: {}", message); + return; + } + + // 检查连接状态 + if (connectionManager.getClientByDeviceId(message.getDeviceId()) == null) { + log.warn("[onMessage][设备({})离线,跳过下行消息] 方法: {}", + message.getDeviceId(), message.getMethod()); + return; + } + + // 处理下行消息 + downstreamHandler.handle(message); + + // 性能监控 + long processTime = System.currentTimeMillis() - startTime; + if (processTime > 1000) { // 超过1秒的慢消息 + log.warn("[onMessage][慢消息处理] 设备ID: {}, 方法: {}, 耗时: {}ms", + message.getDeviceId(), message.getMethod(), processTime); + } + + } catch (Exception e) { + failedMessages.incrementAndGet(); + log.error("[onMessage][处理下行消息失败] 设备ID: {}, 方法: {}, 消息: {}", + message.getDeviceId(), message.getMethod(), message, e); + } + } + + /** + * 获取订阅者统计信息 + */ + public String getSubscriberStatistics() { + return String.format("TCP下游订阅者 - 已处理: %d, 失败: %d, 成功率: %.2f%%", + processedMessages.get(), + failedMessages.get(), + processedMessages.get() > 0 + ? (double) (processedMessages.get() - failedMessages.get()) / processedMessages.get() * 100 + : 0.0); + } + + /** + * 检查订阅者健康状态 + */ + public boolean isHealthy() { + return initialized.get() && downstreamHandler != null; + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java index 8e4481a23f..c42fe19300 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java @@ -1,71 +1,178 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; - -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpConnectionHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import io.vertx.core.net.NetServer; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 TCP 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -@RequiredArgsConstructor -public class IotTcpUpstreamProtocol { - - private final Vertx vertx; - - private final IotGatewayProperties gatewayProperties; - - private final IotTcpConnectionManager connectionManager; - - private final IotDeviceMessageService messageService; - - private final IotDeviceService deviceService; - - private final IotDeviceCommonApi deviceApi; - - @Getter - private String serverId; - - private NetServer netServer; - - @PostConstruct - public void start() { - // 1. 初始化参数 - IotGatewayProperties.TcpProperties tcpProperties = gatewayProperties.getProtocol().getTcp(); - this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getServerPort()); - - // 2. 创建 TCP 服务器 - netServer = vertx.createNetServer(); - netServer.connectHandler(socket -> { - new IotTcpConnectionHandler(socket, connectionManager, - messageService, deviceService, deviceApi, serverId).start(); - }); - - // 3. 启动 TCP 服务器 - netServer.listen(tcpProperties.getServerPort()) - .onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort())) - .onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e)); - } - - @PreDestroy - public void stop() { - if (netServer != null) { - netServer.close() - .onSuccess(v -> log.info("[stop][IoT 网关 TCP 服务已停止]")) - .onFailure(e -> log.error("[stop][IoT 网关 TCP 服务停止失败]", e)); - } - } - +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.TcpDeviceConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.PemKeyCertOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * IoT 网关 TCP 协议:接收设备上行消息 + *

+ * 负责接收设备上行消息,支持: + * 1. 设备注册 + * 2. 心跳保活 + * 3. 属性上报 + * 4. 事件上报 + * 5. 设备连接管理 + * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpUpstreamProtocol { + + private final IotGatewayProperties.TcpProperties tcpProperties; + + private final TcpDeviceConnectionManager connectionManager; + + private final IotDeviceService deviceService; + + private final IotDeviceMessageService messageService; + + private final IotDeviceCommonApi deviceApi; + + private final Vertx vertx; + + @Getter + private final String serverId; + + private NetServer netServer; + + public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties, + TcpDeviceConnectionManager connectionManager, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotDeviceCommonApi deviceApi, + Vertx vertx) { + this.tcpProperties = tcpProperties; + this.connectionManager = connectionManager; + this.deviceService = deviceService; + this.messageService = messageService; + this.deviceApi = deviceApi; + this.vertx = vertx; + this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort()); + } + + @PostConstruct + public void start() { + // 1. 启动 TCP 服务器 + try { + startTcpServer(); + log.info("[start][IoT 网关 TCP 协议处理器启动完成,服务器ID: {}]", serverId); + } catch (Exception e) { + log.error("[start][IoT 网关 TCP 协议处理器启动失败]", e); + // 抛出异常,中断 Spring 容器启动 + throw new RuntimeException("IoT 网关 TCP 协议处理器启动失败", e); + } + } + + @PreDestroy + public void stop() { + if (netServer != null) { + stopTcpServer(); + log.info("[stop][IoT 网关 TCP 协议处理器已停止]"); + } + } + + /** + * 启动 TCP 服务器 + */ + private void startTcpServer() { + // 1. 创建服务器选项 + NetServerOptions options = new NetServerOptions() + .setPort(tcpProperties.getPort()) + .setTcpKeepAlive(true) + .setTcpNoDelay(true) + .setReuseAddress(true); + + // 2. 配置 SSL(如果启用) + if (Boolean.TRUE.equals(tcpProperties.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(tcpProperties.getSslKeyPath()) + .setCertPath(tcpProperties.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + + // 3. 创建 TCP 服务器 + netServer = vertx.createNetServer(options); + + // 4. 设置连接处理器 + netServer.connectHandler(socket -> { + log.info("[startTcpServer][新设备连接: {}]", socket.remoteAddress()); + IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler( + tcpProperties, connectionManager, deviceService, messageService, deviceApi, serverId); + handler.handle(socket); + }); + + // 5. 同步启动服务器,等待结果 + CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + netServer.listen(result -> { + if (result.succeeded()) { + log.info("[startTcpServer][TCP 服务器启动成功] 端口: {}, 服务器ID: {}", + result.result().actualPort(), serverId); + } else { + log.error("[startTcpServer][TCP 服务器启动失败]", result.cause()); + failure.set(result.cause()); + } + latch.countDown(); + }); + + // 6. 等待启动结果,设置超时 + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new RuntimeException("TCP 服务器启动超时"); + } + if (failure.get() != null) { + throw new RuntimeException("TCP 服务器启动失败", failure.get()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("TCP 服务器启动被中断", e); + } + } + + /** + * 停止 TCP 服务器 + */ + private void stopTcpServer() { + if (netServer == null) { + return; + } + log.info("[stopTcpServer][准备关闭 TCP 服务器]"); + CountDownLatch latch = new CountDownLatch(1); + // 异步关闭,并使用 Latch 等待结果 + netServer.close(result -> { + if (result.succeeded()) { + log.info("[stopTcpServer][IoT 网关 TCP 协议处理器已停止]"); + } else { + log.warn("[stopTcpServer][TCP 服务器关闭失败]", result.cause()); + } + latch.countDown(); + }); + + try { + // 等待关闭完成,设置超时 + if (!latch.await(10, TimeUnit.SECONDS)) { + log.warn("[stopTcpServer][关闭 TCP 服务器超时]"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("[stopTcpServer][等待 TCP 服务器关闭被中断]", e); + } + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/client/TcpDeviceClient.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/client/TcpDeviceClient.java new file mode 100644 index 0000000000..eb353a457a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/client/TcpDeviceClient.java @@ -0,0 +1,218 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.client; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * TCP 设备客户端 + *

+ * 封装设备连接的基本信息和操作。 + * 该类中的状态变更(如 authenticated, closed)使用 AtomicBoolean 保证原子性。 + * 对 socket 的操作应在 Vert.x Event Loop 线程中执行,以避免并发问题。 + * + * @author 芋道源码 + */ +@Slf4j +public class TcpDeviceClient { + + @Getter + private final String clientId; + + @Getter + @Setter + private String deviceAddr; // 从 final 移除,因为在注册后才设置 + + @Getter + @Setter + private String productKey; + + @Getter + @Setter + private String deviceName; + + @Getter + @Setter + private Long deviceId; + + @Getter + private NetSocket socket; + + @Getter + @Setter + private RecordParser parser; + + @Getter + private final long keepAliveTimeoutMs; // 改为 final,通过构造函数注入 + + private volatile long lastKeepAliveTime; + + private final AtomicBoolean authenticated = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * 构造函数 + * + * @param clientId 客户端ID,全局唯一 + * @param keepAliveTimeoutMs 心跳超时时间(毫秒),从配置中读取 + */ + public TcpDeviceClient(String clientId, long keepAliveTimeoutMs) { + this.clientId = clientId; + this.keepAliveTimeoutMs = keepAliveTimeoutMs; + this.lastKeepAliveTime = System.currentTimeMillis(); + } + + /** + * 绑定网络套接字,并设置相关处理器。 + * 此方法应在 Vert.x Event Loop 线程中调用。 + * + * @param socket 网络套接字 + */ + public void setSocket(NetSocket socket) { + // 无需 synchronized,Vert.x 保证了同一个 socket 的事件在同一个 Event Loop 中处理 + if (this.socket != null && this.socket != socket) { + log.warn("[setSocket][客户端({})] 正在用新的 socket 替换旧的,旧 socket 将被关闭。", clientId); + this.socket.close(); + } + + this.socket = socket; + + if (socket != null) { + // 1. 设置关闭处理器 + socket.closeHandler(v -> { + log.info("[setSocket][设备客户端({})的连接已由远端关闭]", clientId); + shutdown(); // 统一调用 shutdown 进行资源清理 + }); + + // 2. 设置异常处理器 + socket.exceptionHandler(e -> { + log.error("[setSocket][设备客户端({})连接出现异常]", clientId, e); + shutdown(); // 出现异常时也关闭连接 + }); + + // 3. 设置数据处理器 + socket.handler(buffer -> { + // 任何数据往来都表示连接是活跃的 + keepAlive(); + + if (parser != null) { + parser.handle(buffer); + } else { + log.warn("[setSocket][设备客户端({})] 未设置解析器(parser),原始数据被忽略: {}", clientId, buffer.toString()); + } + }); + } + } + + /** + * 更新心跳时间,表示设备仍然活跃。 + */ + public void keepAlive() { + this.lastKeepAliveTime = System.currentTimeMillis(); + } + + /** + * 检查连接是否在线。 + * 判断标准:未被主动关闭、socket 存在、且在心跳超时时间内。 + * + * @return 是否在线 + */ + public boolean isOnline() { + if (closed.get() || socket == null) { + return false; + } + long idleTime = System.currentTimeMillis() - lastKeepAliveTime; + return idleTime < keepAliveTimeoutMs; + } + + public boolean isAuthenticated() { + return authenticated.get(); + } + + public void setAuthenticated(boolean authenticated) { + this.authenticated.set(authenticated); + } + + /** + * 向设备发送消息。 + * + * @param buffer 消息内容 + */ + public void sendMessage(Buffer buffer) { + if (closed.get() || socket == null) { + log.warn("[sendMessage][设备客户端({})连接已关闭,无法发送消息]", clientId); + return; + } + + // Vert.x 的 write 是异步的,不会阻塞 + socket.write(buffer, result -> { + if (result.succeeded()) { + log.debug("[sendMessage][设备客户端({})发送消息成功]", clientId); + // 发送成功也更新心跳,表示连接活跃 + keepAlive(); + } else { + log.error("[sendMessage][设备客户端({})发送消息失败]", clientId, result.cause()); + // 发送失败可能意味着连接已断开,主动关闭 + shutdown(); + } + }); + } + + /** + * 关闭客户端连接并清理资源。 + * 这是一个幂等操作,可以被多次安全调用。 + */ + public void shutdown() { + // 使用原子操作保证只执行一次关闭逻辑 + if (closed.getAndSet(true)) { + return; + } + + log.info("[shutdown][正在关闭设备客户端连接: {}]", clientId); + + // 先将 socket 引用置空,再关闭,避免并发问题 + NetSocket socketToClose = this.socket; + this.socket = null; + + if (socketToClose != null) { + try { + // close 是异步的,但我们在这里不关心其结果,因为我们已经将客户端标记为关闭 + socketToClose.close(); + } catch (Exception e) { + log.warn("[shutdown][关闭TCP连接时出现异常,可能已被关闭]", e); + } + } + + // 重置认证状态 + authenticated.set(false); + } + + public String getConnectionInfo() { + NetSocket currentSocket = this.socket; + if (currentSocket != null && currentSocket.remoteAddress() != null) { + return currentSocket.remoteAddress().toString(); + } + return "disconnected"; + } + + public long getLastKeepAliveTime() { + return lastKeepAliveTime; + } + + @Override + public String toString() { + return "TcpDeviceClient{" + + "clientId='" + clientId + '\'' + + ", deviceAddr='" + deviceAddr + '\'' + + ", deviceId=" + deviceId + + ", authenticated=" + authenticated.get() + + ", online=" + isOnline() + + ", connection=" + getConnectionInfo() + + '}'; + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java new file mode 100644 index 0000000000..ce7fe4aa5c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java @@ -0,0 +1,503 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager; + +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.client.TcpDeviceClient; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * TCP 设备连接管理器 + *

+ * 参考 EMQX 设计理念: + * 1. 高性能连接管理 + * 2. 连接池和资源管理 + * 3. 流量控制 + * 4. 监控统计 + * 5. 自动清理和容错 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class TcpDeviceConnectionManager { + + // ==================== 连接存储 ==================== + + /** + * 设备客户端映射 + * Key: 设备地址, Value: 设备客户端 + */ + private final ConcurrentMap clientMap = new ConcurrentHashMap<>(); + + /** + * 设备ID到设备地址的映射 + * Key: 设备ID, Value: 设备地址 + */ + private final ConcurrentMap deviceIdToAddrMap = new ConcurrentHashMap<>(); + + /** + * 套接字到客户端的映射,用于快速查找 + * Key: NetSocket, Value: 设备地址 + */ + private final ConcurrentMap socketToAddrMap = new ConcurrentHashMap<>(); + + // ==================== 读写锁 ==================== + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + // ==================== 定时任务 ==================== + + /** + * 定时任务执行器 + */ + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3, r -> { + Thread t = new Thread(r, "tcp-connection-manager"); + t.setDaemon(true); + return t; + }); + + // ==================== 统计信息 ==================== + + private final AtomicLong totalConnections = new AtomicLong(0); + private final AtomicLong totalDisconnections = new AtomicLong(0); + private final AtomicLong totalMessages = new AtomicLong(0); + private final AtomicLong totalFailedMessages = new AtomicLong(0); + private final AtomicLong totalBytes = new AtomicLong(0); + + // ==================== 配置参数 ==================== + + private static final int MAX_CONNECTIONS = 10000; + private static final int HEARTBEAT_CHECK_INTERVAL = 30; // 秒 + private static final int CONNECTION_CLEANUP_INTERVAL = 60; // 秒 + private static final int STATS_LOG_INTERVAL = 300; // 秒 + + /** + * 构造函数,启动定时任务 + */ + public TcpDeviceConnectionManager() { + startScheduledTasks(); + } + + /** + * 启动定时任务 + */ + private void startScheduledTasks() { + // 心跳检查任务 + scheduler.scheduleAtFixedRate(this::checkHeartbeat, + HEARTBEAT_CHECK_INTERVAL, HEARTBEAT_CHECK_INTERVAL, TimeUnit.SECONDS); + + // 连接清理任务 + scheduler.scheduleAtFixedRate(this::cleanupConnections, + CONNECTION_CLEANUP_INTERVAL, CONNECTION_CLEANUP_INTERVAL, TimeUnit.SECONDS); + + // 统计日志任务 + scheduler.scheduleAtFixedRate(this::logStatistics, + STATS_LOG_INTERVAL, STATS_LOG_INTERVAL, TimeUnit.SECONDS); + } + + /** + * 添加设备客户端 + */ + public boolean addClient(String deviceAddr, TcpDeviceClient client) { + if (clientMap.size() >= MAX_CONNECTIONS) { + log.warn("[addClient][连接数已达上限({}),拒绝新连接: {}]", MAX_CONNECTIONS, deviceAddr); + return false; + } + + writeLock.lock(); + try { + log.info("[addClient][添加设备客户端: {}]", deviceAddr); + + // 关闭之前的连接(如果存在) + TcpDeviceClient existingClient = clientMap.get(deviceAddr); + if (existingClient != null) { + log.warn("[addClient][设备({})已存在连接,关闭旧连接]", deviceAddr); + removeClientInternal(deviceAddr, existingClient); + } + + // 添加新连接 + clientMap.put(deviceAddr, client); + + // 添加套接字映射 + if (client.getSocket() != null) { + socketToAddrMap.put(client.getSocket(), deviceAddr); + } + + // 如果客户端已设置设备ID,更新映射 + if (client.getDeviceId() != null) { + deviceIdToAddrMap.put(client.getDeviceId(), deviceAddr); + } + + totalConnections.incrementAndGet(); + return true; + + } finally { + writeLock.unlock(); + } + } + + /** + * 移除设备客户端 + */ + public void removeClient(String deviceAddr) { + writeLock.lock(); + try { + TcpDeviceClient client = clientMap.get(deviceAddr); + if (client != null) { + removeClientInternal(deviceAddr, client); + } + } finally { + writeLock.unlock(); + } + } + + /** + * 内部移除客户端方法(无锁) + */ + private void removeClientInternal(String deviceAddr, TcpDeviceClient client) { + log.info("[removeClient][移除设备客户端: {}]", deviceAddr); + + // 从映射中移除 + clientMap.remove(deviceAddr); + + // 移除套接字映射 + if (client.getSocket() != null) { + socketToAddrMap.remove(client.getSocket()); + } + + // 移除设备ID映射 + if (client.getDeviceId() != null) { + deviceIdToAddrMap.remove(client.getDeviceId()); + } + + // 关闭连接 + client.shutdown(); + + totalDisconnections.incrementAndGet(); + } + + /** + * 通过设备地址获取客户端 + */ + public TcpDeviceClient getClient(String deviceAddr) { + readLock.lock(); + try { + return clientMap.get(deviceAddr); + } finally { + readLock.unlock(); + } + } + + /** + * 通过设备ID获取客户端 + */ + public TcpDeviceClient getClientByDeviceId(Long deviceId) { + readLock.lock(); + try { + String deviceAddr = deviceIdToAddrMap.get(deviceId); + return deviceAddr != null ? clientMap.get(deviceAddr) : null; + } finally { + readLock.unlock(); + } + } + + /** + * 通过网络连接获取客户端 + */ + public TcpDeviceClient getClientBySocket(NetSocket socket) { + readLock.lock(); + try { + String deviceAddr = socketToAddrMap.get(socket); + return deviceAddr != null ? clientMap.get(deviceAddr) : null; + } finally { + readLock.unlock(); + } + } + + /** + * 检查设备是否在线 + */ + public boolean isDeviceOnline(Long deviceId) { + TcpDeviceClient client = getClientByDeviceId(deviceId); + return client != null && client.isOnline(); + } + + /** + * 设置设备ID映射 + */ + public void setDeviceIdMapping(String deviceAddr, Long deviceId) { + writeLock.lock(); + try { + TcpDeviceClient client = clientMap.get(deviceAddr); + if (client != null) { + client.setDeviceId(deviceId); + deviceIdToAddrMap.put(deviceId, deviceAddr); + log.debug("[setDeviceIdMapping][设置设备ID映射: {} -> {}]", deviceAddr, deviceId); + } + } finally { + writeLock.unlock(); + } + } + + /** + * 发送消息给设备 + */ + public boolean sendMessage(String deviceAddr, Buffer buffer) { + TcpDeviceClient client = getClient(deviceAddr); + if (client != null && client.isOnline()) { + try { + client.sendMessage(buffer); + totalMessages.incrementAndGet(); + totalBytes.addAndGet(buffer.length()); + return true; + } catch (Exception e) { + totalFailedMessages.incrementAndGet(); + log.error("[sendMessage][发送消息失败] 设备地址: {}", deviceAddr, e); + return false; + } + } + log.warn("[sendMessage][设备({})不在线,无法发送消息]", deviceAddr); + return false; + } + + /** + * 通过设备ID发送消息 + */ + public boolean sendMessageByDeviceId(Long deviceId, Buffer buffer) { + TcpDeviceClient client = getClientByDeviceId(deviceId); + if (client != null && client.isOnline()) { + try { + client.sendMessage(buffer); + totalMessages.incrementAndGet(); + totalBytes.addAndGet(buffer.length()); + return true; + } catch (Exception e) { + totalFailedMessages.incrementAndGet(); + log.error("[sendMessageByDeviceId][发送消息失败] 设备ID: {}", deviceId, e); + return false; + } + } + log.warn("[sendMessageByDeviceId][设备ID({})不在线,无法发送消息]", deviceId); + return false; + } + + /** + * 广播消息给所有在线设备 + */ + public int broadcastMessage(Buffer buffer) { + int successCount = 0; + readLock.lock(); + try { + for (TcpDeviceClient client : clientMap.values()) { + if (client.isOnline()) { + try { + client.sendMessage(buffer); + successCount++; + } catch (Exception e) { + log.error("[broadcastMessage][广播消息失败] 设备: {}", client.getDeviceAddr(), e); + } + } + } + } finally { + readLock.unlock(); + } + + totalMessages.addAndGet(successCount); + totalBytes.addAndGet((long) successCount * buffer.length()); + return successCount; + } + + /** + * 获取在线设备数量 + */ + public int getOnlineCount() { + readLock.lock(); + try { + return (int) clientMap.values().stream() + .filter(TcpDeviceClient::isOnline) + .count(); + } finally { + readLock.unlock(); + } + } + + /** + * 获取总连接数 + */ + public int getTotalCount() { + return clientMap.size(); + } + + /** + * 获取认证设备数量 + */ + public int getAuthenticatedCount() { + readLock.lock(); + try { + return (int) clientMap.values().stream() + .filter(TcpDeviceClient::isAuthenticated) + .count(); + } finally { + readLock.unlock(); + } + } + + /** + * 心跳检查任务 + */ + private void checkHeartbeat() { + try { + long currentTime = System.currentTimeMillis(); + int offlineCount = 0; + + readLock.lock(); + try { + for (TcpDeviceClient client : clientMap.values()) { + if (!client.isOnline()) { + offlineCount++; + } + } + } finally { + readLock.unlock(); + } + + if (offlineCount > 0) { + log.info("[checkHeartbeat][发现{}个离线设备,将在清理任务中处理]", offlineCount); + } + } catch (Exception e) { + log.error("[checkHeartbeat][心跳检查任务异常]", e); + } + } + + /** + * 连接清理任务 + */ + private void cleanupConnections() { + try { + int beforeSize = clientMap.size(); + + writeLock.lock(); + try { + clientMap.entrySet().removeIf(entry -> { + TcpDeviceClient client = entry.getValue(); + if (!client.isOnline()) { + log.debug("[cleanupConnections][清理离线连接: {}]", entry.getKey()); + + // 清理相关映射 + if (client.getSocket() != null) { + socketToAddrMap.remove(client.getSocket()); + } + if (client.getDeviceId() != null) { + deviceIdToAddrMap.remove(client.getDeviceId()); + } + + client.shutdown(); + totalDisconnections.incrementAndGet(); + return true; + } + return false; + }); + } finally { + writeLock.unlock(); + } + + int afterSize = clientMap.size(); + if (beforeSize != afterSize) { + log.info("[cleanupConnections][清理完成] 连接数: {} -> {}, 清理数: {}", + beforeSize, afterSize, beforeSize - afterSize); + } + } catch (Exception e) { + log.error("[cleanupConnections][连接清理任务异常]", e); + } + } + + /** + * 统计日志任务 + */ + private void logStatistics() { + try { + long totalConn = totalConnections.get(); + long totalDisconn = totalDisconnections.get(); + long totalMsg = totalMessages.get(); + long totalFailedMsg = totalFailedMessages.get(); + long totalBytesValue = totalBytes.get(); + + log.info("[logStatistics][连接统计] 总连接: {}, 总断开: {}, 当前在线: {}, 认证设备: {}, " + + "总消息: {}, 失败消息: {}, 总字节: {}", + totalConn, totalDisconn, getOnlineCount(), getAuthenticatedCount(), + totalMsg, totalFailedMsg, totalBytesValue); + } catch (Exception e) { + log.error("[logStatistics][统计日志任务异常]", e); + } + } + + /** + * 关闭连接管理器 + */ + public void shutdown() { + log.info("[shutdown][关闭TCP连接管理器]"); + + // 关闭定时任务 + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + + // 关闭所有连接 + writeLock.lock(); + try { + clientMap.values().forEach(TcpDeviceClient::shutdown); + clientMap.clear(); + deviceIdToAddrMap.clear(); + socketToAddrMap.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * 获取连接状态信息 + */ + public String getConnectionStatus() { + return String.format("总连接数: %d, 在线设备: %d, 认证设备: %d, 成功率: %.2f%%", + getTotalCount(), getOnlineCount(), getAuthenticatedCount(), + totalMessages.get() > 0 + ? (double) (totalMessages.get() - totalFailedMessages.get()) / totalMessages.get() * 100 + : 0.0); + } + + /** + * 获取详细统计信息 + */ + public String getDetailedStatistics() { + return String.format( + "TCP连接管理器统计:\n" + + "- 当前连接数: %d\n" + + "- 在线设备数: %d\n" + + "- 认证设备数: %d\n" + + "- 历史总连接: %d\n" + + "- 历史总断开: %d\n" + + "- 总消息数: %d\n" + + "- 失败消息数: %d\n" + + "- 总字节数: %d\n" + + "- 消息成功率: %.2f%%", + getTotalCount(), getOnlineCount(), getAuthenticatedCount(), + totalConnections.get(), totalDisconnections.get(), + totalMessages.get(), totalFailedMessages.get(), totalBytes.get(), + totalMessages.get() > 0 + ? (double) (totalMessages.get() - totalFailedMessages.get()) / totalMessages.get() * 100 + : 0.0); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java deleted file mode 100644 index e3d9750b80..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java new file mode 100644 index 0000000000..8e7baa37d8 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java @@ -0,0 +1,97 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol; + +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +/** + * TCP 数据解码器 + *

+ * 负责将字节流解码为 TcpDataPackage 对象 + *

+ * 数据包格式: + * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * + * @author 芋道源码 + */ +@Slf4j +public class TcpDataDecoder { + + /** + * 解码数据包 + * + * @param buffer 数据缓冲区 + * @return 解码后的数据包 + * @throws IllegalArgumentException 如果数据包格式不正确 + */ + public static TcpDataPackage decode(Buffer buffer) { + if (buffer == null || buffer.length() < 8) { + throw new IllegalArgumentException("数据包长度不足"); + } + + try { + int index = 0; + + // 1. 获取设备地址长度(2字节) + short addrLength = buffer.getShort(index); + index += 2; + + // 2. 校验数据包长度 + int expectedLength = 2 + addrLength + 2 + 2; // 地址长度 + 地址 + 功能码 + 消息序号 + if (buffer.length() < expectedLength) { + throw new IllegalArgumentException("数据包长度不足,期望至少 " + expectedLength + " 字节"); + } + + // 3. 获取设备地址 + String addr = buffer.getBuffer(index, index + addrLength).toString(); + index += addrLength; + + // 4. 获取功能码(2字节) + short code = buffer.getShort(index); + index += 2; + + // 5. 获取消息序号(2字节) + short mid = buffer.getShort(index); + index += 2; + + // 6. 获取包体数据 + String payload = ""; + if (index < buffer.length()) { + payload = buffer.getString(index, buffer.length()); + } + + // 7. 构建数据包对象 + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addrLength((int) addrLength) + .addr(addr) + .code(code) + .mid(mid) + .payload(payload) + .build(); + + log.debug("[decode][解码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 包体长度: {}", + addr, dataPackage.getCodeDescription(), mid, payload.length()); + + return dataPackage; + + } catch (Exception e) { + log.error("[decode][解码失败] 数据: {}", buffer.toString(), e); + throw new IllegalArgumentException("数据包解码失败: " + e.getMessage(), e); + } + } + + /** + * 校验数据包格式 + * + * @param buffer 数据缓冲区 + * @return 校验结果 + */ + public static boolean validate(Buffer buffer) { + try { + decode(buffer); + return true; + } catch (Exception e) { + log.warn("[validate][数据包格式校验失败] 数据: {}, 错误: {}", buffer.toString(), e.getMessage()); + return false; + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java new file mode 100644 index 0000000000..fb0a68c182 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java @@ -0,0 +1,172 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol; + +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +/** + * TCP 数据编码器 + *

+ * 负责将 TcpDataPackage 对象编码为字节流 + *

+ * 数据包格式: + * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * + * @author 芋道源码 + */ +@Slf4j +public class TcpDataEncoder { + + /** + * 编码数据包 + * + * @param dataPackage 数据包对象 + * @return 编码后的字节流 + * @throws IllegalArgumentException 如果数据包对象不正确 + */ + public static Buffer encode(TcpDataPackage dataPackage) { + if (dataPackage == null) { + throw new IllegalArgumentException("数据包对象不能为空"); + } + + if (dataPackage.getAddr() == null || dataPackage.getAddr().isEmpty()) { + throw new IllegalArgumentException("设备地址不能为空"); + } + + if (dataPackage.getPayload() == null) { + dataPackage.setPayload(""); + } + + try { + Buffer buffer = Buffer.buffer(); + + // 1. 计算包体长度(除了包头4字节) + int payloadLength = dataPackage.getPayload().getBytes().length; + int totalLength = 2 + dataPackage.getAddr().length() + 2 + 2 + payloadLength; + + // 2. 写入包头:总长度(4字节) + buffer.appendInt(totalLength); + + // 3. 写入设备地址长度(2字节) + buffer.appendShort((short) dataPackage.getAddr().length()); + + // 4. 写入设备地址(不定长) + buffer.appendBytes(dataPackage.getAddr().getBytes()); + + // 5. 写入功能码(2字节) + buffer.appendShort(dataPackage.getCode()); + + // 6. 写入消息序号(2字节) + buffer.appendShort(dataPackage.getMid()); + + // 7. 写入包体数据(不定长) + buffer.appendBytes(dataPackage.getPayload().getBytes()); + + log.debug("[encode][编码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 总长度: {}", + dataPackage.getAddr(), dataPackage.getCodeDescription(), + dataPackage.getMid(), buffer.length()); + + return buffer; + + } catch (Exception e) { + log.error("[encode][编码失败] 数据包: {}", dataPackage, e); + throw new IllegalArgumentException("数据包编码失败: " + e.getMessage(), e); + } + } + + /** + * 创建注册回复数据包 + * + * @param addr 设备地址 + * @param mid 消息序号 + * @param success 是否成功 + * @return 编码后的数据包 + */ + public static Buffer createRegisterReply(String addr, short mid, boolean success) { + String payload = success ? "0" : "1"; // 0表示成功,1表示失败 + + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(addr) + .code(TcpDataPackage.CODE_REGISTER_REPLY) + .mid(mid) + .payload(payload) + .build(); + + return encode(dataPackage); + } + + /** + * 创建数据下发数据包 + * + * @param addr 设备地址 + * @param mid 消息序号 + * @param data 下发数据 + * @return 编码后的数据包 + */ + public static Buffer createDataDownPackage(String addr, short mid, String data) { + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(addr) + .code(TcpDataPackage.CODE_DATA_DOWN) + .mid(mid) + .payload(data) + .build(); + + return encode(dataPackage); + } + + /** + * 创建服务调用数据包 + * + * @param addr 设备地址 + * @param mid 消息序号 + * @param serviceData 服务数据 + * @return 编码后的数据包 + */ + public static Buffer createServiceInvokePackage(String addr, short mid, String serviceData) { + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(addr) + .code(TcpDataPackage.CODE_SERVICE_INVOKE) + .mid(mid) + .payload(serviceData) + .build(); + + return encode(dataPackage); + } + + /** + * 创建属性设置数据包 + * + * @param addr 设备地址 + * @param mid 消息序号 + * @param propertyData 属性数据 + * @return 编码后的数据包 + */ + public static Buffer createPropertySetPackage(String addr, short mid, String propertyData) { + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(addr) + .code(TcpDataPackage.CODE_PROPERTY_SET) + .mid(mid) + .payload(propertyData) + .build(); + + return encode(dataPackage); + } + + /** + * 创建属性获取数据包 + * + * @param addr 设备地址 + * @param mid 消息序号 + * @param propertyNames 属性名称列表 + * @return 编码后的数据包 + */ + public static Buffer createPropertyGetPackage(String addr, short mid, String propertyNames) { + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(addr) + .code(TcpDataPackage.CODE_PROPERTY_GET) + .mid(mid) + .payload(propertyNames) + .build(); + + return encode(dataPackage); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java new file mode 100644 index 0000000000..3b6f7df286 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java @@ -0,0 +1,153 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * TCP 数据包协议定义 + *

+ * 数据包格式: + * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * + * @author 芋道源码 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class TcpDataPackage { + + // ==================== 功能码定义 ==================== + + /** + * 设备注册 + */ + public static final short CODE_REGISTER = 10; + /** + * 注册回复 + */ + public static final short CODE_REGISTER_REPLY = 11; + /** + * 心跳 + */ + public static final short CODE_HEARTBEAT = 20; + /** + * 数据上报 + */ + public static final short CODE_DATA_UP = 30; + /** + * 事件上报 + */ + public static final short CODE_EVENT_UP = 40; + /** + * 数据下发 + */ + public static final short CODE_DATA_DOWN = 50; + /** + * 服务调用 + */ + public static final short CODE_SERVICE_INVOKE = 60; + /** + * 属性设置 + */ + public static final short CODE_PROPERTY_SET = 70; + /** + * 属性获取 + */ + public static final short CODE_PROPERTY_GET = 80; + + // ==================== 数据包字段 ==================== + + /** + * 设备地址长度 + */ + private Integer addrLength; + + /** + * 设备地址 + */ + private String addr; + + /** + * 功能码 + */ + private short code; + + /** + * 消息序号 + */ + private short mid; + + /** + * 包体数据 + */ + private String payload; + + // ==================== 辅助方法 ==================== + + /** + * 是否为注册消息 + */ + public boolean isRegisterMessage() { + return code == CODE_REGISTER; + } + + /** + * 是否为心跳消息 + */ + public boolean isHeartbeatMessage() { + return code == CODE_HEARTBEAT; + } + + /** + * 是否为数据上报消息 + */ + public boolean isDataUpMessage() { + return code == CODE_DATA_UP; + } + + /** + * 是否为事件上报消息 + */ + public boolean isEventUpMessage() { + return code == CODE_EVENT_UP; + } + + /** + * 是否为下行消息 + */ + public boolean isDownstreamMessage() { + return code == CODE_DATA_DOWN || code == CODE_SERVICE_INVOKE || + code == CODE_PROPERTY_SET || code == CODE_PROPERTY_GET; + } + + /** + * 获取功能码描述 + */ + public String getCodeDescription() { + switch (code) { + case CODE_REGISTER: + return "设备注册"; + case CODE_REGISTER_REPLY: + return "注册回复"; + case CODE_HEARTBEAT: + return "心跳"; + case CODE_DATA_UP: + return "数据上报"; + case CODE_EVENT_UP: + return "事件上报"; + case CODE_DATA_DOWN: + return "数据下发"; + case CODE_SERVICE_INVOKE: + return "服务调用"; + case CODE_PROPERTY_SET: + return "属性设置"; + case CODE_PROPERTY_GET: + return "属性获取"; + default: + return "未知功能码"; + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java new file mode 100644 index 0000000000..f796389907 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java @@ -0,0 +1,159 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + +/** + * TCP 数据读取器 + *

+ * 负责从 TCP 流中读取完整的数据包 + *

+ * 数据包格式: + * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * + * @author 芋道源码 + */ +@Slf4j +public class TcpDataReader { + + /** + * 创建数据包解析器 + * + * @param receiveHandler 接收处理器 + * @return RecordParser 解析器 + */ + public static RecordParser createParser(Consumer receiveHandler) { + // 首先读取4字节的长度信息 + RecordParser parser = RecordParser.newFixed(4); + + // 设置处理器 + parser.setOutput(new Handler() { + // 当前数据包的长度,-1表示还没有读取到长度信息 + private int dataLength = -1; + + @Override + public void handle(Buffer buffer) { + try { + // 如果还没有读取到长度信息 + if (dataLength == -1) { + // 从包头中读取数据长度 + dataLength = buffer.getInt(0); + + // 校验数据长度 + if (dataLength <= 0 || dataLength > 1024 * 1024) { // 最大1MB + log.error("[handle][无效的数据包长度: {}]", dataLength); + reset(); + return; + } + + // 切换到读取数据模式 + parser.fixedSizeMode(dataLength); + + log.debug("[handle][读取到数据包长度: {}]", dataLength); + } else { + // 读取到完整的数据包 + log.debug("[handle][读取到完整数据包,长度: {}]", buffer.length()); + + // 处理数据包 + try { + receiveHandler.accept(buffer); + } catch (Exception e) { + log.error("[handle][处理数据包失败]", e); + } + + // 重置状态,准备读取下一个数据包 + reset(); + } + } catch (Exception e) { + log.error("[handle][数据包处理异常]", e); + reset(); + } + } + + /** + * 重置解析器状态 + */ + private void reset() { + dataLength = -1; + parser.fixedSizeMode(4); + } + }); + + return parser; + } + + /** + * 创建带异常处理的数据包解析器 + * + * @param receiveHandler 接收处理器 + * @param exceptionHandler 异常处理器 + * @return RecordParser 解析器 + */ + public static RecordParser createParserWithExceptionHandler( + Consumer receiveHandler, + Consumer exceptionHandler) { + + RecordParser parser = RecordParser.newFixed(4); + + parser.setOutput(new Handler() { + private int dataLength = -1; + + @Override + public void handle(Buffer buffer) { + try { + if (dataLength == -1) { + dataLength = buffer.getInt(0); + + if (dataLength <= 0 || dataLength > 1024 * 1024) { + throw new IllegalArgumentException("无效的数据包长度: " + dataLength); + } + + parser.fixedSizeMode(dataLength); + log.debug("[handle][读取到数据包长度: {}]", dataLength); + } else { + log.debug("[handle][读取到完整数据包,长度: {}]", buffer.length()); + + try { + receiveHandler.accept(buffer); + } catch (Exception e) { + exceptionHandler.accept(e); + } + + reset(); + } + } catch (Exception e) { + exceptionHandler.accept(e); + reset(); + } + } + + private void reset() { + dataLength = -1; + parser.fixedSizeMode(4); + } + }); + + return parser; + } + + /** + * 创建简单的数据包解析器(用于测试) + * + * @param receiveHandler 接收处理器 + * @return RecordParser 解析器 + */ + public static RecordParser createSimpleParser(Consumer receiveHandler) { + return createParser(buffer -> { + try { + TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); + receiveHandler.accept(dataPackage); + } catch (Exception e) { + log.error("[createSimpleParser][解码数据包失败]", e); + } + }); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java deleted file mode 100644 index ff64f453da..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java +++ /dev/null @@ -1,148 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; - -import cn.hutool.core.util.BooleanUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Handler; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.net.NetSocket; -import io.vertx.core.parsetools.RecordParser; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT TCP 连接处理器 - *

- * 核心负责: - * 1. 【认证】创建连接后,设备需要发送认证消息,认证通过后,才能进行后续的通信 - * 2. 【消息处理】接收设备发送的消息,解码后,发送到消息队列 - * 3. 【断开】设备断开连接后,清理资源 - * - * @author 芋道源码 - */ -@RequiredArgsConstructor -@Slf4j -public class IotTcpConnectionHandler implements Handler { - - private final NetSocket socket; - /** - * 是否已认证 - */ - private boolean authenticated = false; - /** - * 设备信息 - */ - private IotDeviceRespDTO device; - - private final IotTcpConnectionManager connectionManager; - - private final IotDeviceMessageService messageService; - - private final IotDeviceService deviceService; - - private final IotDeviceCommonApi deviceApi; - - private final String serverId; - - public void start() { - // 1. 设置解析器 - final RecordParser parser = RecordParser.newDelimited("\n", this); - socket.handler(parser); - - // 2. 设置处理器 - socket.closeHandler(v -> handleConnectionClose()); - socket.exceptionHandler(this::handleException); - } - - @Override - public void handle(Buffer buffer) { - log.info("[handle][接收到数据: {}]", buffer); - try { - // TODO @haohao:可以调研下,做个对比表格哈; - // 1. 处理认证 - if (!authenticated) { - handleAuthentication(buffer); - return; - } - // 2. 处理消息 - handleMessage(buffer); - } catch (Exception e) { - log.error("[handle][处理异常]", e); - socket.close(); - } - } - - private void handleAuthentication(Buffer buffer) { - // 1. 解析认证信息 - // TODO @芋艿:这里的认证协议,需要和设备端约定。默认为 productKey,deviceName,password - // TODO @haohao:这里,要不也 json 解析?类似 http 是 { - // "clientId": "4aymZgOTOOCrDKRT.small", - // "username": "small&4aymZgOTOOCrDKRT", - // "password": "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75" - //} - String[] parts = buffer.toString().split(","); - if (parts.length != 3) { - log.error("[handleAuthentication][认证信息({})格式不正确]", buffer); - socket.close(); - return; - } - String productKey = parts[0]; - String deviceName = parts[1]; - String password = parts[2]; - - // 2. 执行认证 - CommonResult authResult = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(socket.remoteAddress().toString()).setUsername(productKey + "/" + deviceName) - .setPassword(password)); - if (authResult.isError() || !BooleanUtil.isTrue(authResult.getData())) { - log.error("[handleAuthentication][认证失败,productKey({}) deviceName({}) password({})]", productKey, deviceName, - password); - socket.close(); - return; - } - - // 3. 认证成功 - this.authenticated = true; - this.device = deviceService.getDeviceFromCache(productKey, deviceName); - connectionManager.addConnection(String.valueOf(device.getId()), socket); - - // 4. 发送上线消息 - IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline(); - messageService.sendDeviceMessage(message, productKey, deviceName, serverId); - log.info("[handleAuthentication][认证成功]"); - } - - private void handleMessage(Buffer buffer) { - // 1. 解码消息 - IotDeviceMessage message = messageService.decodeDeviceMessage(buffer.getBytes(), - device.getProductKey(), device.getDeviceName()); - if (message == null) { - log.warn("[handleMessage][解码消息失败]"); - return; - } - // 2. 发送消息到队列 - messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId); - } - - private void handleConnectionClose() { - // 1. 移除连接 - connectionManager.removeConnection(socket); - // 2. 发送离线消息 - if (device != null) { - IotDeviceMessage message = IotDeviceMessage.buildStateOffline(); - messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId); - } - } - - private void handleException(Throwable e) { - log.error("[handleException][连接({}) 发生异常]", socket.remoteAddress(), e); - socket.close(); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java index a4dce318b7..7c499fb974 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java @@ -1,51 +1,364 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; - -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.net.NetSocket; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * IoT 网关 TCP 下行消息处理器 - *

- * 从消息总线接收到下行消息,然后发布到 TCP 连接,从而被设备所接收 - * - * @author 芋道源码 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class IotTcpDownstreamHandler { - - private final IotTcpConnectionManager connectionManager; - - private final IotDeviceMessageService messageService; - - /** - * 处理下行消息 - * - * @param message 设备消息 - */ - public void handle(IotDeviceMessage message) { - // 1. 获取设备对应的连接 - NetSocket socket = connectionManager.getConnection(String.valueOf(message.getDeviceId())); - if (socket == null) { - log.error("[handle][设备({})的连接不存在]", message.getDeviceId()); - return; - } - - // 2. 编码消息 - byte[] bytes = messageService.encodeDeviceMessage(message, null, null); - - // 3. 发送消息 - socket.write(Buffer.buffer(bytes)); - // TODO @芋艿:这里的换行符,需要和设备端约定 - // TODO @haohao:tcp 要不定长?很少 \n 哈。然后有个 magic number;可以参考 dubbo rpc; - socket.write("\n"); - } - +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; + +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.client.TcpDeviceClient; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.TcpDeviceConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataEncoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataPackage; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import com.alibaba.fastjson.JSON; +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 TCP 下行消息处理器 + *

+ * 负责处理从业务系统发送到设备的下行消息,包括: + * 1. 属性设置 + * 2. 服务调用 + * 3. 属性获取 + * 4. 配置下发 + * 5. OTA 升级 + * + * @author 芋道源码 + */ +@Slf4j +public class IotTcpDownstreamHandler { + + private final TcpDeviceConnectionManager connectionManager; + + private final IotDeviceMessageService messageService; + + public IotTcpDownstreamHandler(TcpDeviceConnectionManager connectionManager, + IotDeviceMessageService messageService) { + this.connectionManager = connectionManager; + this.messageService = messageService; + } + + /** + * 处理下行消息 + * + * @param message 设备消息 + */ + public void handle(IotDeviceMessage message) { + try { + log.info("[handle][处理下行消息] 设备ID: {}, 方法: {}, 消息ID: {}", + message.getDeviceId(), message.getMethod(), message.getId()); + + // 1. 获取设备连接 + TcpDeviceClient client = connectionManager.getClientByDeviceId(message.getDeviceId()); + if (client == null || !client.isOnline()) { + log.error("[handle][设备({})不在线,无法发送下行消息]", message.getDeviceId()); + return; + } + + // 2. 根据消息方法处理不同类型的下行消息 + switch (message.getMethod()) { + case "thing.property.set": + handlePropertySet(client, message); + break; + case "thing.property.get": + handlePropertyGet(client, message); + break; + case "thing.service.invoke": + handleServiceInvoke(client, message); + break; + case "thing.config.push": + handleConfigPush(client, message); + break; + case "thing.ota.upgrade": + handleOtaUpgrade(client, message); + break; + default: + log.warn("[handle][未知的下行消息方法: {}]", message.getMethod()); + break; + } + + } catch (Exception e) { + log.error("[handle][处理下行消息失败]", e); + } + } + + /** + * 处理属性设置 + * + * @param client 设备客户端 + * @param message 设备消息 + */ + private void handlePropertySet(TcpDeviceClient client, IotDeviceMessage message) { + try { + log.info("[handlePropertySet][属性设置] 设备地址: {}, 属性: {}", + client.getDeviceAddr(), message.getParams()); + + // 使用编解码器发送消息,降级处理使用原始编码 + sendMessageWithCodec(client, message, "handlePropertySet", () -> { + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + Buffer buffer = TcpDataEncoder.createPropertySetPackage( + client.getDeviceAddr(), mid, payload); + client.sendMessage(buffer); + + log.debug("[handlePropertySet][属性设置消息已发送(降级)] 设备地址: {}, 消息序号: {}", + client.getDeviceAddr(), mid); + }); + + } catch (Exception e) { + log.error("[handlePropertySet][属性设置失败]", e); + } + } + + /** + * 处理属性获取 + * + * @param client 设备客户端 + * @param message 设备消息 + */ + private void handlePropertyGet(TcpDeviceClient client, IotDeviceMessage message) { + try { + log.info("[handlePropertyGet][属性获取] 设备地址: {}, 属性列表: {}", + client.getDeviceAddr(), message.getParams()); + + // 使用编解码器发送消息,降级处理使用原始编码 + sendMessageWithCodec(client, message, "handlePropertyGet", () -> { + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + Buffer buffer = TcpDataEncoder.createPropertyGetPackage( + client.getDeviceAddr(), mid, payload); + client.sendMessage(buffer); + + log.debug("[handlePropertyGet][属性获取消息已发送(降级)] 设备地址: {}, 消息序号: {}", + client.getDeviceAddr(), mid); + }); + + } catch (Exception e) { + log.error("[handlePropertyGet][属性获取失败]", e); + } + } + + /** + * 处理服务调用 + * + * @param client 设备客户端 + * @param message 设备消息 + */ + private void handleServiceInvoke(TcpDeviceClient client, IotDeviceMessage message) { + try { + log.info("[handleServiceInvoke][服务调用] 设备地址: {}, 服务参数: {}", + client.getDeviceAddr(), message.getParams()); + + // 1. 构建服务调用数据包 + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + Buffer buffer = TcpDataEncoder.createServiceInvokePackage( + client.getDeviceAddr(), mid, payload); + + // 2. 发送消息 + client.sendMessage(buffer); + + log.debug("[handleServiceInvoke][服务调用消息已发送] 设备地址: {}, 消息序号: {}", + client.getDeviceAddr(), mid); + + } catch (Exception e) { + log.error("[handleServiceInvoke][服务调用失败]", e); + } + } + + /** + * 处理配置推送 + * + * @param client 设备客户端 + * @param message 设备消息 + */ + private void handleConfigPush(TcpDeviceClient client, IotDeviceMessage message) { + try { + log.info("[handleConfigPush][配置推送] 设备地址: {}, 配置: {}", + client.getDeviceAddr(), message.getParams()); + + // 1. 构建配置推送数据包 + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + Buffer buffer = TcpDataEncoder.createDataDownPackage( + client.getDeviceAddr(), mid, payload); + + // 2. 发送消息 + client.sendMessage(buffer); + + log.debug("[handleConfigPush][配置推送消息已发送] 设备地址: {}, 消息序号: {}", + client.getDeviceAddr(), mid); + + } catch (Exception e) { + log.error("[handleConfigPush][配置推送失败]", e); + } + } + + /** + * 处理 OTA 升级 + * + * @param client 设备客户端 + * @param message 设备消息 + */ + private void handleOtaUpgrade(TcpDeviceClient client, IotDeviceMessage message) { + try { + log.info("[handleOtaUpgrade][OTA升级] 设备地址: {}, 升级信息: {}", + client.getDeviceAddr(), message.getParams()); + + // 1. 构建 OTA 升级数据包 + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + Buffer buffer = TcpDataEncoder.createDataDownPackage( + client.getDeviceAddr(), mid, payload); + + // 2. 发送消息 + client.sendMessage(buffer); + + log.debug("[handleOtaUpgrade][OTA升级消息已发送] 设备地址: {}, 消息序号: {}", + client.getDeviceAddr(), mid); + + } catch (Exception e) { + log.error("[handleOtaUpgrade][OTA升级失败]", e); + } + } + + /** + * 处理自定义下行消息 + * + * @param client 设备客户端 + * @param message 设备消息 + * @param code 功能码 + */ + private void handleCustomMessage(TcpDeviceClient client, IotDeviceMessage message, short code) { + try { + log.info("[handleCustomMessage][自定义消息] 设备地址: {}, 功能码: {}, 数据: {}", + client.getDeviceAddr(), code, message.getParams()); + + // 1. 构建自定义数据包 + String payload = JSON.toJSONString(message.getParams()); + short mid = generateMessageId(); + + TcpDataPackage dataPackage = TcpDataPackage.builder() + .addr(client.getDeviceAddr()) + .code(code) + .mid(mid) + .payload(payload) + .build(); + + Buffer buffer = TcpDataEncoder.encode(dataPackage); + + // 2. 发送消息 + client.sendMessage(buffer); + + log.debug("[handleCustomMessage][自定义消息已发送] 设备地址: {}, 功能码: {}, 消息序号: {}", + client.getDeviceAddr(), code, mid); + + } catch (Exception e) { + log.error("[handleCustomMessage][自定义消息发送失败]", e); + } + } + + /** + * 批量发送下行消息 + * + * @param deviceIds 设备ID列表 + * @param message 设备消息 + */ + public void broadcastMessage(Long[] deviceIds, IotDeviceMessage message) { + try { + log.info("[broadcastMessage][批量发送消息] 设备数量: {}, 方法: {}", + deviceIds.length, message.getMethod()); + + for (Long deviceId : deviceIds) { + // 创建副本消息(避免消息ID冲突) + IotDeviceMessage copyMessage = IotDeviceMessage.of( + message.getRequestId(), + message.getMethod(), + message.getParams(), + message.getData(), + message.getCode(), + message.getMsg()); + copyMessage.setDeviceId(deviceId); + + // 处理单个设备消息 + handle(copyMessage); + } + + } catch (Exception e) { + log.error("[broadcastMessage][批量发送消息失败]", e); + } + } + + /** + * 检查设备是否支持指定方法 + * + * @param client 设备客户端 + * @param method 消息方法 + * @return 是否支持 + */ + private boolean isMethodSupported(TcpDeviceClient client, String method) { + // TODO: 可以根据设备类型或产品信息判断是否支持特定方法 + return IotDeviceMessageMethodEnum.of(method) != null; + } + + /** + * 生成消息序号 + * + * @return 消息序号 + */ + private short generateMessageId() { + return (short) (System.currentTimeMillis() % Short.MAX_VALUE); + } + + /** + * 使用编解码器发送消息 + * + * @param client 设备客户端 + * @param message 设备消息 + * @param methodName 方法名称 + * @param fallbackAction 降级处理逻辑 + */ + private void sendMessageWithCodec(TcpDeviceClient client, IotDeviceMessage message, + String methodName, Runnable fallbackAction) { + try { + // 1. 使用编解码器编码消息 + byte[] messageBytes = messageService.encodeDeviceMessage( + message, client.getProductKey(), client.getDeviceName()); + + // 2. 解析编码后的数据包并设置设备地址和消息序号 + Buffer buffer = Buffer.buffer(messageBytes); + TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); + dataPackage.setAddr(client.getDeviceAddr()); + dataPackage.setMid(generateMessageId()); + + // 3. 重新编码并发送 + Buffer finalBuffer = TcpDataEncoder.encode(dataPackage); + client.sendMessage(finalBuffer); + + log.debug("[{}][消息已发送] 设备地址: {}, 消息序号: {}", + methodName, client.getDeviceAddr(), dataPackage.getMid()); + + } catch (Exception e) { + log.warn("[{}][使用编解码器编码失败,降级使用原始编码] 错误: {}", + methodName, e.getMessage()); + + // 执行降级处理 + if (fallbackAction != null) { + fallbackAction.run(); + } + } + } + + /** + * 获取连接统计信息 + * + * @return 连接统计信息 + */ + public String getHandlerStatistics() { + return String.format("TCP下游处理器 - %s", connectionManager.getConnectionStatus()); + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java new file mode 100644 index 0000000000..0067e72064 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java @@ -0,0 +1,393 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.client.TcpDeviceClient; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.TcpDeviceConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataDecoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataEncoder; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataPackage; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol.TcpDataReader; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 TCP 上行消息处理器 + *

+ * 核心负责: + * 1. 【设备注册】设备连接后发送注册消息,注册成功后可以进行通信 + * 2. 【心跳处理】定期接收设备心跳消息,维持连接状态 + * 3. 【数据上报】接收设备数据上报和事件上报 + * 4. 【连接管理】管理连接的建立、维护和清理 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotTcpUpstreamHandler implements Handler { + + private final IotGatewayProperties.TcpProperties tcpConfig; + + private final TcpDeviceConnectionManager connectionManager; + + private final IotDeviceService deviceService; + + private final IotDeviceMessageService messageService; + + private final IotDeviceCommonApi deviceApi; + + private final String serverId; + + @Override + public void handle(NetSocket socket) { + log.info("[handle][收到设备连接: {}]", socket.remoteAddress()); + + // 创建客户端ID和设备客户端 + String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress(); + TcpDeviceClient client = new TcpDeviceClient(clientId, tcpConfig.getKeepAliveTimeoutMs()); + + try { + // 设置连接异常和关闭处理 + socket.exceptionHandler(ex -> { + log.error("[handle][连接({})异常]", socket.remoteAddress(), ex); + handleConnectionClose(client); + }); + + socket.closeHandler(v -> { + log.info("[handle][连接({})关闭]", socket.remoteAddress()); + handleConnectionClose(client); + }); + + // 设置网络连接 + client.setSocket(socket); + + // 创建数据解析器 + RecordParser parser = TcpDataReader.createParser(buffer -> { + try { + handleDataPackage(client, buffer); + } catch (Exception e) { + log.error("[handle][处理数据包异常]", e); + } + }); + + // 设置解析器 + client.setParser(parser); + + log.info("[handle][设备连接处理器初始化完成: {}]", clientId); + + } catch (Exception e) { + log.error("[handle][初始化连接处理器失败]", e); + client.shutdown(); + } + } + + /** + * 处理数据包 + * + * @param client 设备客户端 + * @param buffer 数据缓冲区 + */ + private void handleDataPackage(TcpDeviceClient client, io.vertx.core.buffer.Buffer buffer) { + try { + // 解码数据包 + TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); + + log.info("[handleDataPackage][接收数据包] 设备地址: {}, 功能码: {}, 消息序号: {}", + dataPackage.getAddr(), dataPackage.getCodeDescription(), dataPackage.getMid()); + + // 根据功能码处理不同类型的消息 + switch (dataPackage.getCode()) { + case TcpDataPackage.CODE_REGISTER: + handleDeviceRegister(client, dataPackage); + break; + case TcpDataPackage.CODE_HEARTBEAT: + handleHeartbeat(client, dataPackage); + break; + case TcpDataPackage.CODE_DATA_UP: + handleDataUp(client, dataPackage); + break; + case TcpDataPackage.CODE_EVENT_UP: + handleEventUp(client, dataPackage); + break; + default: + log.warn("[handleDataPackage][未知功能码: {}]", dataPackage.getCode()); + break; + } + + } catch (Exception e) { + log.error("[handleDataPackage][处理数据包失败]", e); + } + } + + /** + * 处理设备注册 + * + * @param client 设备客户端 + * @param dataPackage 数据包 + */ + private void handleDeviceRegister(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + String deviceAddr = dataPackage.getAddr(); + String productKey = dataPackage.getPayload(); + + log.info("[handleDeviceRegister][设备注册] 设备地址: {}, 产品密钥: {}", deviceAddr, productKey); + + // 获取设备信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceAddr); + if (device == null) { + log.error("[handleDeviceRegister][设备不存在: {} - {}]", productKey, deviceAddr); + sendRegisterReply(client, dataPackage, false); + return; + } + + // 更新客户端信息 + client.setProductKey(productKey); + client.setDeviceName(deviceAddr); + client.setDeviceId(device.getId()); + client.setAuthenticated(true); + + // 添加到连接管理器 + connectionManager.addClient(deviceAddr, client); + connectionManager.setDeviceIdMapping(deviceAddr, device.getId()); + + // 发送设备上线消息 + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + messageService.sendDeviceMessage(onlineMessage, productKey, deviceAddr, serverId); + + // 发送注册成功回复 + sendRegisterReply(client, dataPackage, true); + + log.info("[handleDeviceRegister][设备注册成功] 设备地址: {}, 设备ID: {}", deviceAddr, device.getId()); + + } catch (Exception e) { + log.error("[handleDeviceRegister][设备注册失败]", e); + sendRegisterReply(client, dataPackage, false); + } + } + + /** + * 处理心跳 + * + * @param client 设备客户端 + * @param dataPackage 数据包 + */ + private void handleHeartbeat(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + String deviceAddr = dataPackage.getAddr(); + + log.debug("[handleHeartbeat][收到心跳] 设备地址: {}", deviceAddr); + + // 更新心跳时间 + client.keepAlive(); + + // 发送心跳回复(可选) + // sendHeartbeatReply(client, dataPackage); + + } catch (Exception e) { + log.error("[handleHeartbeat][处理心跳失败]", e); + } + } + + /** + * 处理数据上报 + * + * @param client 设备客户端 + * @param dataPackage 数据包 + */ + private void handleDataUp(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + String deviceAddr = dataPackage.getAddr(); + String payload = dataPackage.getPayload(); + + log.info("[handleDataUp][数据上报] 设备地址: {}, 数据: {}", deviceAddr, payload); + + // 检查设备是否已认证 + if (!client.isAuthenticated()) { + log.warn("[handleDataUp][设备未认证,忽略数据上报: {}]", deviceAddr); + return; + } + + // 使用 IotDeviceMessageService 解码消息 + try { + // 1. 将 TCP 数据包重新编码为字节数组 + Buffer buffer = TcpDataEncoder.encode(dataPackage); + byte[] messageBytes = buffer.getBytes(); + + // 2. 使用 messageService 解码消息 + IotDeviceMessage message = messageService.decodeDeviceMessage( + messageBytes, client.getProductKey(), client.getDeviceName()); + + // 3. 发送解码后的消息 + messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); + + } catch (Exception e) { + log.warn("[handleDataUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage()); + + // 降级处理:使用原始方式解析数据 + JSONObject dataJson = JSONUtil.parseObj(payload); + IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", dataJson); + messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); + } + + // 发送数据上报回复 + sendDataUpReply(client, dataPackage); + + } catch (Exception e) { + log.error("[handleDataUp][处理数据上报失败]", e); + } + } + + /** + * 处理事件上报 + * + * @param client 设备客户端 + * @param dataPackage 数据包 + */ + private void handleEventUp(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + String deviceAddr = dataPackage.getAddr(); + String payload = dataPackage.getPayload(); + + log.info("[handleEventUp][事件上报] 设备地址: {}, 数据: {}", deviceAddr, payload); + + // 检查设备是否已认证 + if (!client.isAuthenticated()) { + log.warn("[handleEventUp][设备未认证,忽略事件上报: {}]", deviceAddr); + return; + } + + // 使用 IotDeviceMessageService 解码消息 + try { + // 1. 将 TCP 数据包重新编码为字节数组 + Buffer buffer = TcpDataEncoder.encode(dataPackage); + byte[] messageBytes = buffer.getBytes(); + + // 2. 使用 messageService 解码消息 + IotDeviceMessage message = messageService.decodeDeviceMessage( + messageBytes, client.getProductKey(), client.getDeviceName()); + + // 3. 发送解码后的消息 + messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); + + } catch (Exception e) { + log.warn("[handleEventUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage()); + + // 降级处理:使用原始方式解析数据 + JSONObject eventJson = JSONUtil.parseObj(payload); + IotDeviceMessage message = IotDeviceMessage.requestOf("thing.event.post", eventJson); + messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); + } + + // 发送事件上报回复 + sendEventUpReply(client, dataPackage); + + } catch (Exception e) { + log.error("[handleEventUp][处理事件上报失败]", e); + } + } + + /** + * 发送注册回复 + * + * @param client 设备客户端 + * @param dataPackage 原始数据包 + * @param success 是否成功 + */ + private void sendRegisterReply(TcpDeviceClient client, TcpDataPackage dataPackage, boolean success) { + try { + io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.createRegisterReply( + dataPackage.getAddr(), dataPackage.getMid(), success); + client.sendMessage(replyBuffer); + + log.debug("[sendRegisterReply][发送注册回复] 设备地址: {}, 结果: {}", + dataPackage.getAddr(), success ? "成功" : "失败"); + } catch (Exception e) { + log.error("[sendRegisterReply][发送注册回复失败]", e); + } + } + + /** + * 发送数据上报回复 + * + * @param client 设备客户端 + * @param dataPackage 原始数据包 + */ + private void sendDataUpReply(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + TcpDataPackage replyPackage = TcpDataPackage.builder() + .addr(dataPackage.getAddr()) + .code(TcpDataPackage.CODE_DATA_UP) + .mid(dataPackage.getMid()) + .payload("0") // 0表示成功 + .build(); + + io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage); + client.sendMessage(replyBuffer); + + } catch (Exception e) { + log.error("[sendDataUpReply][发送数据上报回复失败]", e); + } + } + + /** + * 发送事件上报回复 + * + * @param client 设备客户端 + * @param dataPackage 原始数据包 + */ + private void sendEventUpReply(TcpDeviceClient client, TcpDataPackage dataPackage) { + try { + TcpDataPackage replyPackage = TcpDataPackage.builder() + .addr(dataPackage.getAddr()) + .code(TcpDataPackage.CODE_EVENT_UP) + .mid(dataPackage.getMid()) + .payload("0") // 0表示成功 + .build(); + + io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage); + client.sendMessage(replyBuffer); + + } catch (Exception e) { + log.error("[sendEventUpReply][发送事件上报回复失败]", e); + } + } + + /** + * 处理连接关闭 + * + * @param client 设备客户端 + */ + private void handleConnectionClose(TcpDeviceClient client) { + try { + String deviceAddr = client.getDeviceAddr(); + + // 发送设备离线消息 + if (client.isAuthenticated()) { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + messageService.sendDeviceMessage(offlineMessage, + client.getProductKey(), client.getDeviceName(), serverId); + } + + // 从连接管理器移除 + if (deviceAddr != null) { + connectionManager.removeClient(deviceAddr); + } + + log.info("[handleConnectionClose][处理连接关闭完成] 设备地址: {}", deviceAddr); + + } catch (Exception e) { + log.error("[handleConnectionClose][处理连接关闭失败]", e); + } + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java index 1680ca31da..6f1f731d29 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -31,7 +31,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { /** * 编解码器 */ - private final Map codes; + private final Map codes; @Resource private IotDeviceService deviceService; @@ -39,8 +39,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { @Resource private IotDeviceMessageProducer deviceMessageProducer; - public IotDeviceMessageServiceImpl(List codes) { - this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type); + public IotDeviceMessageServiceImpl(List codes) { + this.codes = CollectionUtils.convertMap(codes, IotDeviceMessageCodec::type); } @Override @@ -52,7 +52,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { throw exception(DEVICE_NOT_EXISTS, productKey, deviceName); } // 1.2 获取编解码器 - IotAlinkDeviceMessageCodec codec = codes.get(device.getCodecType()); + IotDeviceMessageCodec codec = codes.get(device.getCodecType()); if (codec == null) { throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType())); } @@ -70,7 +70,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { throw exception(DEVICE_NOT_EXISTS, productKey, deviceName); } // 1.2 获取编解码器 - IotAlinkDeviceMessageCodec codec = codes.get(device.getCodecType()); + IotDeviceMessageCodec codec = codes.get(device.getCodecType()); if (codec == null) { throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType())); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index f50edd0eeb..26376b6669 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -79,8 +79,13 @@ yudao: # 针对引入的 TCP 组件的配置 # ==================================== tcp: - enabled: true - server-port: 8093 + enabled: false + port: 8091 + keep-alive-timeout-ms: 30000 + max-connections: 1000 + ssl-enabled: false + ssl-cert-path: "classpath:certs/client.jks" + ssl-key-path: "classpath:certs/client.jks" --- #################### 日志相关配置 ####################