From 6a117c9d550ac8bc9448d0eac23b8512dbccf515 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 19 Jul 2025 10:18:29 +0800 Subject: [PATCH] =?UTF-8?q?review=EF=BC=9A=E3=80=90IoT=20=E7=89=A9?= =?UTF-8?q?=E8=81=94=E7=BD=91=E3=80=91TCP=20=E7=BD=91=E7=BB=9C=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../codec/tcp/IotTcpDeviceMessageCodec.java | 29 ++++++------ .../tcp/IotTcpDownstreamSubscriber.java | 13 +++--- .../protocol/tcp/IotTcpUpstreamProtocol.java | 1 + .../protocol/tcp/client/TcpDeviceClient.java | 46 ++++++++++--------- .../manager/TcpDeviceConnectionManager.java | 23 ++++++---- .../protocol/tcp/protocol/TcpDataDecoder.java | 21 +++++---- .../protocol/tcp/protocol/TcpDataEncoder.java | 33 ++++--------- .../protocol/tcp/protocol/TcpDataPackage.java | 9 +++- .../protocol/tcp/protocol/TcpDataReader.java | 13 ++++-- .../tcp/router/IotTcpDownstreamHandler.java | 11 ++--- .../tcp/router/IotTcpUpstreamHandler.java | 36 ++++++--------- 11 files changed, 114 insertions(+), 121 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java index 0bcef2e0cb..6a558b5141 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpDeviceMessageCodec.java @@ -54,6 +54,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { static { // 初始化方法映射 + // TODO @haohao:有没可能去掉这个 code 到 method 的映射哈? initializeMethodMappings(); } @@ -75,6 +76,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { * 负载字段名 */ public static class PayloadField { + public static final String TIMESTAMP = "timestamp"; public static final String MESSAGE_ID = "msgId"; public static final String DEVICE_ID = "deviceId"; @@ -82,12 +84,14 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { public static final String DATA = "data"; public static final String CODE = "code"; public static final String MESSAGE = "message"; + } /** * 消息方法映射 */ public static class MessageMethod { + public static final String PROPERTY_POST = "thing.property.post"; public static final String PROPERTY_SET = "thing.property.set"; public static final String PROPERTY_GET = "thing.property.get"; @@ -97,6 +101,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { public static final String OTA_UPGRADE = "thing.ota.upgrade"; public static final String STATE_ONLINE = "thing.state.online"; public static final String STATE_OFFLINE = "thing.state.offline"; + } // ==================== 初始化方法 ==================== @@ -139,9 +144,9 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { // 3. 构建 TCP 数据包 TcpDataPackage dataPackage = TcpDataPackage.builder() - .addr("") // 地址在发送时由调用方设置 + .addr("") .code(code) - .mid((short) 0) // 消息序号在发送时由调用方设置 + .mid((short) 0) .payload(payload) .build(); @@ -154,9 +159,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { log.debug("[encode][TCP 消息编码成功] 方法: {}, 数据长度: {}", message.getMethod(), result.length); } - return result; - } catch (Exception e) { log.error("[encode][TCP 消息编码失败] 消息: {}", message, e); throw new TcpCodecException("TCP 消息编码失败", e); @@ -175,13 +178,10 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { // 1. 解码 TCP 数据包 Buffer buffer = Buffer.buffer(bytes); TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); - // 2. 获取消息方法 String method = getMethodByCodeSafely(dataPackage.getCode()); - // 3. 解析负载数据 Object params = parsePayloadOptimized(dataPackage.getPayload()); - // 4. 构建 IoT 设备消息 IotDeviceMessage message = IotDeviceMessage.requestOf(method, params); @@ -190,9 +190,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { log.debug("[decode][TCP 消息解码成功] 方法: {}, 功能码: {}", method, dataPackage.getCode()); } - return message; - } catch (Exception e) { log.error("[decode][TCP 消息解码失败] 数据长度: {}, 数据内容: {}", bytes.length, truncateData(bytes, 100), e); @@ -226,8 +224,8 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { if (Objects.isNull(bytes) || bytes.length == 0) { throw new IllegalArgumentException("待解码数据不能为空"); } - if (bytes.length > 1024 * 1024) { // 1MB 限制 - throw new IllegalArgumentException("数据包过大,超过1MB限制"); + if (bytes.length > 1024 * 1024) { + throw new IllegalArgumentException("数据包过大,超过 1MB 限制"); } } @@ -236,9 +234,10 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { */ private short getCodeByMethodSafely(String method) { Short code = METHOD_TO_CODE_MAP.get(method); + // 默认为数据上报 if (code == null) { log.warn("[getCodeByMethodSafely][未知的消息方法: {},使用默认功能码]", method); - return TcpDataPackage.CODE_DATA_UP; // 默认为数据上报 + return TcpDataPackage.CODE_DATA_UP; } return code; } @@ -260,6 +259,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { */ private String buildPayloadOptimized(IotDeviceMessage message) { // 使用缓存键 + // TODO @haohao:是不是不用缓存哈? String cacheKey = message.getMethod() + "_" + message.getRequestId(); JSONObject cachedPayload = jsonCache.get(cacheKey); @@ -271,7 +271,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { // 创建新的负载 JSONObject payload = new JSONObject(); - // 添加基础字段 addToPayloadIfNotNull(payload, PayloadField.MESSAGE_ID, message.getRequestId()); addToPayloadIfNotNull(payload, PayloadField.DEVICE_ID, message.getDeviceId()); @@ -279,7 +278,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { addToPayloadIfNotNull(payload, PayloadField.DATA, message.getData()); addToPayloadIfNotNull(payload, PayloadField.CODE, message.getCode()); addToPayloadIfNotEmpty(payload, PayloadField.MESSAGE, message.getMsg()); - // 添加时间戳 payload.set(PayloadField.TIMESTAMP, System.currentTimeMillis()); @@ -317,7 +315,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { } return jsonObject.containsKey(PayloadField.PARAMS) ? jsonObject.get(PayloadField.PARAMS) : jsonObject; - } catch (JSONException e) { log.warn("[parsePayloadOptimized][负载解析为JSON失败,返回原始字符串] 负载: {}", payload); return payload; @@ -379,6 +376,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { * TCP 编解码异常 */ public static class TcpCodecException extends RuntimeException { + public TcpCodecException(String message) { super(message); } @@ -386,5 +384,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec { public TcpCodecException(String message, Throwable cause) { super(message, cause); } + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java index d5c916295c..3f47e14080 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java @@ -108,16 +108,14 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber 1000) { // 超过1秒的慢消息 + // TODO @haohao:1000 搞成静态变量; + if (processTime > 1000) { // 超过 1 秒的慢消息 log.warn("[onMessage][慢消息处理] 设备ID: {}, 方法: {}, 耗时: {}ms", message.getDeviceId(), message.getMethod(), processTime); } - } catch (Exception e) { failedMessages.incrementAndGet(); log.error("[onMessage][处理下行消息失败] 设备ID: {}, 方法: {}, 消息: {}", @@ -142,6 +140,8 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber - * 封装设备连接的基本信息和操作。 * 该类中的状态变更(如 authenticated, closed)使用 AtomicBoolean 保证原子性。 * 对 socket 的操作应在 Vert.x Event Loop 线程中执行,以避免并发问题。 * @@ -48,7 +47,7 @@ public class TcpDeviceClient { private RecordParser parser; @Getter - private final long keepAliveTimeoutMs; // 改为 final,通过构造函数注入 + private final long keepAliveTimeoutMs; private volatile long lastKeepAliveTime; @@ -58,7 +57,7 @@ public class TcpDeviceClient { /** * 构造函数 * - * @param clientId 客户端ID,全局唯一 + * @param clientId 客户端 ID,全局唯一 * @param keepAliveTimeoutMs 心跳超时时间(毫秒),从配置中读取 */ public TcpDeviceClient(String clientId, long keepAliveTimeoutMs) { @@ -69,19 +68,19 @@ public class TcpDeviceClient { /** * 绑定网络套接字,并设置相关处理器。 - * 此方法应在 Vert.x Event Loop 线程中调用。 + * 此方法应在 Vert.x Event Loop 线程中调用 * * @param socket 网络套接字 */ public void setSocket(NetSocket socket) { // 无需 synchronized,Vert.x 保证了同一个 socket 的事件在同一个 Event Loop 中处理 if (this.socket != null && this.socket != socket) { - log.warn("[setSocket][客户端({})] 正在用新的 socket 替换旧的,旧 socket 将被关闭。", clientId); + log.warn("[setSocket][客户端({}) 正在用新的 socket 替换旧的,旧 socket 将被关闭]", clientId); this.socket.close(); } - this.socket = socket; + // 注册处理器 if (socket != null) { // 1. 设置关闭处理器 socket.closeHandler(v -> { @@ -103,22 +102,22 @@ public class TcpDeviceClient { if (parser != null) { parser.handle(buffer); } else { - log.warn("[setSocket][设备客户端({})] 未设置解析器(parser),原始数据被忽略: {}", clientId, buffer.toString()); + log.warn("[setSocket][设备客户端({}) 未设置解析器(parser),原始数据被忽略: {}]", clientId, buffer.toString()); } }); } } /** - * 更新心跳时间,表示设备仍然活跃。 + * 更新心跳时间,表示设备仍然活跃 */ public void keepAlive() { this.lastKeepAliveTime = System.currentTimeMillis(); } /** - * 检查连接是否在线。 - * 判断标准:未被主动关闭、socket 存在、且在心跳超时时间内。 + * 检查连接是否在线 + * 判断标准:未被主动关闭、socket 存在、且在心跳超时时间内 * * @return 是否在线 */ @@ -130,6 +129,8 @@ public class TcpDeviceClient { return idleTime < keepAliveTimeoutMs; } + // TODO @haohao:1)是不是简化下:productKey 和 deviceName 非空,就认为是已认证;2)如果是的话,productKey 和 deviceName 搞成一个设置方法?setAuthenticated(productKey、deviceName) + public boolean isAuthenticated() { return authenticated.get(); } @@ -139,7 +140,7 @@ public class TcpDeviceClient { } /** - * 向设备发送消息。 + * 向设备发送消息 * * @param buffer 消息内容 */ @@ -151,18 +152,22 @@ public class TcpDeviceClient { // Vert.x 的 write 是异步的,不会阻塞 socket.write(buffer, result -> { - if (result.succeeded()) { - log.debug("[sendMessage][设备客户端({})发送消息成功]", clientId); - // 发送成功也更新心跳,表示连接活跃 - keepAlive(); - } else { + // 发送失败可能意味着连接已断开,主动关闭 + if (!result.succeeded()) { log.error("[sendMessage][设备客户端({})发送消息失败]", clientId, result.cause()); - // 发送失败可能意味着连接已断开,主动关闭 shutdown(); + return; } + + // 发送成功也更新心跳,表示连接活跃 + if (log.isDebugEnabled()) { + log.debug("[sendMessage][设备客户端({})发送消息成功]", clientId); + } + keepAlive(); }); } + // TODO @haohao:是不是叫 close 好点?或者问问大模型 /** * 关闭客户端连接并清理资源。 * 这是一个幂等操作,可以被多次安全调用。 @@ -200,10 +205,6 @@ public class TcpDeviceClient { return "disconnected"; } - public long getLastKeepAliveTime() { - return lastKeepAliveTime; - } - @Override public String toString() { return "TcpDeviceClient{" + @@ -215,4 +216,5 @@ public class TcpDeviceClient { ", connection=" + getConnectionInfo() + '}'; } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java index ce7fe4aa5c..b2b6b3c31e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/TcpDeviceConnectionManager.java @@ -16,8 +16,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * 参考 EMQX 设计理念: * 1. 高性能连接管理 * 2. 连接池和资源管理 - * 3. 流量控制 - * 4. 监控统计 + * 3. 流量控制 TODO @haohao:这个要不先去掉 + * 4. 监控统计 TODO @haohao:这个要不先去掉 * 5. 自动清理和容错 * * @author 芋道源码 @@ -106,6 +106,7 @@ public class TcpDeviceConnectionManager { * 添加设备客户端 */ public boolean addClient(String deviceAddr, TcpDeviceClient client) { + // TODO @haohao:这个要不去掉;目前看着没做 result 的处理; if (clientMap.size() >= MAX_CONNECTIONS) { log.warn("[addClient][连接数已达上限({}),拒绝新连接: {}]", MAX_CONNECTIONS, deviceAddr); return false; @@ -130,14 +131,13 @@ public class TcpDeviceConnectionManager { socketToAddrMap.put(client.getSocket(), deviceAddr); } - // 如果客户端已设置设备ID,更新映射 + // 如果客户端已设置设备 ID,更新映射 if (client.getDeviceId() != null) { deviceIdToAddrMap.put(client.getDeviceId(), deviceAddr); } totalConnections.incrementAndGet(); return true; - } finally { writeLock.unlock(); } @@ -196,7 +196,7 @@ public class TcpDeviceConnectionManager { } /** - * 通过设备ID获取客户端 + * 通过设备 ID 获取客户端 */ public TcpDeviceClient getClientByDeviceId(Long deviceId) { readLock.lock(); @@ -208,6 +208,8 @@ public class TcpDeviceConnectionManager { } } + // TODO @haohao:getClientBySocket、isDeviceOnline、sendMessage、sendMessageByDeviceId、broadcastMessage 用不到的方法,要不先暂时不提供?保持简洁、更容易理解哈。 + /** * 通过网络连接获取客户端 */ @@ -230,7 +232,7 @@ public class TcpDeviceConnectionManager { } /** - * 设置设备ID映射 + * 设置设备 ID 映射 */ public void setDeviceIdMapping(String deviceAddr, Long deviceId) { writeLock.lock(); @@ -349,12 +351,12 @@ public class TcpDeviceConnectionManager { } } + // TODO @haohao:心跳超时,需要 close 么? /** * 心跳检查任务 */ private void checkHeartbeat() { try { - long currentTime = System.currentTimeMillis(); int offlineCount = 0; readLock.lock(); @@ -369,7 +371,7 @@ public class TcpDeviceConnectionManager { } if (offlineCount > 0) { - log.info("[checkHeartbeat][发现{}个离线设备,将在清理任务中处理]", offlineCount); + log.info("[checkHeartbeat][发现 {} 个离线设备,将在清理任务中处理]", offlineCount); } } catch (Exception e) { log.error("[checkHeartbeat][心跳检查任务异常]", e); @@ -424,14 +426,14 @@ public class TcpDeviceConnectionManager { private void logStatistics() { try { long totalConn = totalConnections.get(); - long totalDisconn = totalDisconnections.get(); + long totalDisconnections = this.totalDisconnections.get(); long totalMsg = totalMessages.get(); long totalFailedMsg = totalFailedMessages.get(); long totalBytesValue = totalBytes.get(); log.info("[logStatistics][连接统计] 总连接: {}, 总断开: {}, 当前在线: {}, 认证设备: {}, " + "总消息: {}, 失败消息: {}, 总字节: {}", - totalConn, totalDisconn, getOnlineCount(), getAuthenticatedCount(), + totalConn, totalDisconnections, getOnlineCount(), getAuthenticatedCount(), totalMsg, totalFailedMsg, totalBytesValue); } catch (Exception e) { log.error("[logStatistics][统计日志任务异常]", e); @@ -500,4 +502,5 @@ public class TcpDeviceConnectionManager { ? (double) (totalMessages.get() - totalFailedMessages.get()) / totalMessages.get() * 100 : 0.0); } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java index 8e7baa37d8..ed4b2ebaa0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataDecoder.java @@ -3,13 +3,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol; import io.vertx.core.buffer.Buffer; import lombok.extern.slf4j.Slf4j; +// TODO @haohao:“设备地址长度”是不是不需要。 /** * TCP 数据解码器 *

* 负责将字节流解码为 TcpDataPackage 对象 *

* 数据包格式: - * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长) * * @author 芋道源码 */ @@ -31,35 +32,35 @@ public class TcpDataDecoder { try { int index = 0; - // 1. 获取设备地址长度(2字节) + // 1.1 获取设备地址长度(2字节) short addrLength = buffer.getShort(index); index += 2; - // 2. 校验数据包长度 + // 1.2 校验数据包长度 int expectedLength = 2 + addrLength + 2 + 2; // 地址长度 + 地址 + 功能码 + 消息序号 if (buffer.length() < expectedLength) { throw new IllegalArgumentException("数据包长度不足,期望至少 " + expectedLength + " 字节"); } - // 3. 获取设备地址 + // 1.3 获取设备地址 String addr = buffer.getBuffer(index, index + addrLength).toString(); index += addrLength; - // 4. 获取功能码(2字节) + // 1.4 获取功能码(2字节) short code = buffer.getShort(index); index += 2; - // 5. 获取消息序号(2字节) + // 1.5 获取消息序号(2字节) short mid = buffer.getShort(index); index += 2; - // 6. 获取包体数据 + // 1.6 获取包体数据 String payload = ""; if (index < buffer.length()) { payload = buffer.getString(index, buffer.length()); } - // 7. 构建数据包对象 + // 2. 构建数据包对象 TcpDataPackage dataPackage = TcpDataPackage.builder() .addrLength((int) addrLength) .addr(addr) @@ -70,15 +71,14 @@ public class TcpDataDecoder { log.debug("[decode][解码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 包体长度: {}", addr, dataPackage.getCodeDescription(), mid, payload.length()); - return dataPackage; - } catch (Exception e) { log.error("[decode][解码失败] 数据: {}", buffer.toString(), e); throw new IllegalArgumentException("数据包解码失败: " + e.getMessage(), e); } } + // TODO @haohao:这个要不去掉,暂时没用到; /** * 校验数据包格式 * @@ -94,4 +94,5 @@ public class TcpDataDecoder { return false; } } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java index fb0a68c182..62f7bc4848 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataEncoder.java @@ -27,11 +27,9 @@ public class TcpDataEncoder { if (dataPackage == null) { throw new IllegalArgumentException("数据包对象不能为空"); } - if (dataPackage.getAddr() == null || dataPackage.getAddr().isEmpty()) { throw new IllegalArgumentException("设备地址不能为空"); } - if (dataPackage.getPayload() == null) { dataPackage.setPayload(""); } @@ -39,34 +37,27 @@ public class TcpDataEncoder { try { Buffer buffer = Buffer.buffer(); - // 1. 计算包体长度(除了包头4字节) + // 1. 计算包体长度(除了包头 4 字节) int payloadLength = dataPackage.getPayload().getBytes().length; int totalLength = 2 + dataPackage.getAddr().length() + 2 + 2 + payloadLength; - // 2. 写入包头:总长度(4字节) + // 2.1 写入包头:总长度(4 字节) buffer.appendInt(totalLength); - - // 3. 写入设备地址长度(2字节) + // 2.2 写入设备地址长度(2 字节) buffer.appendShort((short) dataPackage.getAddr().length()); - - // 4. 写入设备地址(不定长) + // 2.3 写入设备地址(不定长) buffer.appendBytes(dataPackage.getAddr().getBytes()); - - // 5. 写入功能码(2字节) + // 2.4 写入功能码(2 字节) buffer.appendShort(dataPackage.getCode()); - - // 6. 写入消息序号(2字节) + // 2.5 写入消息序号(2 字节) buffer.appendShort(dataPackage.getMid()); - - // 7. 写入包体数据(不定长) + // 2.6 写入包体数据(不定长) buffer.appendBytes(dataPackage.getPayload().getBytes()); log.debug("[encode][编码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 总长度: {}", dataPackage.getAddr(), dataPackage.getCodeDescription(), dataPackage.getMid(), buffer.length()); - return buffer; - } catch (Exception e) { log.error("[encode][编码失败] 数据包: {}", dataPackage, e); throw new IllegalArgumentException("数据包编码失败: " + e.getMessage(), e); @@ -82,15 +73,14 @@ public class TcpDataEncoder { * @return 编码后的数据包 */ public static Buffer createRegisterReply(String addr, short mid, boolean success) { - String payload = success ? "0" : "1"; // 0表示成功,1表示失败 - + // TODO @haohao:payload 默认成功、失败,最好讴有个枚举 + String payload = success ? "0" : "1"; // 0 表示成功,1 表示失败 TcpDataPackage dataPackage = TcpDataPackage.builder() .addr(addr) .code(TcpDataPackage.CODE_REGISTER_REPLY) .mid(mid) .payload(payload) .build(); - return encode(dataPackage); } @@ -109,7 +99,6 @@ public class TcpDataEncoder { .mid(mid) .payload(data) .build(); - return encode(dataPackage); } @@ -128,7 +117,6 @@ public class TcpDataEncoder { .mid(mid) .payload(serviceData) .build(); - return encode(dataPackage); } @@ -147,7 +135,6 @@ public class TcpDataEncoder { .mid(mid) .payload(propertyData) .build(); - return encode(dataPackage); } @@ -166,7 +153,7 @@ public class TcpDataEncoder { .mid(mid) .payload(propertyNames) .build(); - return encode(dataPackage); } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java index 3b6f7df286..c0a7e7185d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataPackage.java @@ -9,7 +9,7 @@ import lombok.NoArgsConstructor; * TCP 数据包协议定义 *

* 数据包格式: - * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长) * * @author 芋道源码 */ @@ -29,10 +29,12 @@ public class TcpDataPackage { * 注册回复 */ public static final short CODE_REGISTER_REPLY = 11; + // TODO @haohao:【重要】一般心跳,服务端会回复一条;回复要搞独立的 code 码,还是继续用原来的,因为 requestId 可以映射; /** * 心跳 */ public static final short CODE_HEARTBEAT = 20; + // TODO @haohao:【重要】下面的,是不是融合成消息上行(client -> server),消息下行(server -> client);然后把 method 放到 body 里? /** * 数据上报 */ @@ -60,6 +62,8 @@ public class TcpDataPackage { // ==================== 数据包字段 ==================== + // TODO @haohao:设备 addrLength、addr 是不是非必要呀? + /** * 设备地址长度 */ @@ -87,6 +91,8 @@ public class TcpDataPackage { // ==================== 辅助方法 ==================== + // TODO @haohao:用不到的方法,可以清理掉哈; + /** * 是否为注册消息 */ @@ -123,6 +129,7 @@ public class TcpDataPackage { code == CODE_PROPERTY_SET || code == CODE_PROPERTY_GET; } + // TODO @haohao:这个是不是去掉呀?多了一些维护成本; /** * 获取功能码描述 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java index f796389907..f366418d7e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/protocol/TcpDataReader.java @@ -13,7 +13,7 @@ import java.util.function.Consumer; * 负责从 TCP 流中读取完整的数据包 *

* 数据包格式: - * 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长) + * 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长) * * @author 芋道源码 */ @@ -27,12 +27,12 @@ public class TcpDataReader { * @return RecordParser 解析器 */ public static RecordParser createParser(Consumer receiveHandler) { - // 首先读取4字节的长度信息 + // 首先读取 4 字节的长度信息 RecordParser parser = RecordParser.newFixed(4); // 设置处理器 parser.setOutput(new Handler() { - // 当前数据包的长度,-1表示还没有读取到长度信息 + // 当前数据包的长度,-1 表示还没有读取到长度信息 private int dataLength = -1; @Override @@ -43,8 +43,9 @@ public class TcpDataReader { // 从包头中读取数据长度 dataLength = buffer.getInt(0); - // 校验数据长度 - if (dataLength <= 0 || dataLength > 1024 * 1024) { // 最大1MB + // 校验数据长度(最大 1 MB) + // TODO @haohao:1m 蛮多地方在写死,最好配置管理下。或者有个全局的枚举; + if (dataLength <= 0 || dataLength > 1024 * 1024) { log.error("[handle][无效的数据包长度: {}]", dataLength); reset(); return; @@ -86,6 +87,8 @@ public class TcpDataReader { return parser; } + // TODO @haohao:用不到的方法,可以清理掉哈; + /** * 创建带异常处理的数据包解析器 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java index 7c499fb974..1fcb6a2bb5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java @@ -55,6 +55,7 @@ public class IotTcpDownstreamHandler { } // 2. 根据消息方法处理不同类型的下行消息 + // TODO @芋艿、@haohao:看看有没什么办法,减少这样的编码。拓展新消息类型,成本高; switch (message.getMethod()) { case "thing.property.set": handlePropertySet(client, message); @@ -75,8 +76,8 @@ public class IotTcpDownstreamHandler { log.warn("[handle][未知的下行消息方法: {}]", message.getMethod()); break; } - } catch (Exception e) { + // TODO @haohao:最好消息的内容,打印下; log.error("[handle][处理下行消息失败]", e); } } @@ -104,7 +105,6 @@ public class IotTcpDownstreamHandler { log.debug("[handlePropertySet][属性设置消息已发送(降级)] 设备地址: {}, 消息序号: {}", client.getDeviceAddr(), mid); }); - } catch (Exception e) { log.error("[handlePropertySet][属性设置失败]", e); } @@ -133,7 +133,6 @@ public class IotTcpDownstreamHandler { log.debug("[handlePropertyGet][属性获取消息已发送(降级)] 设备地址: {}, 消息序号: {}", client.getDeviceAddr(), mid); }); - } catch (Exception e) { log.error("[handlePropertyGet][属性获取失败]", e); } @@ -162,7 +161,6 @@ public class IotTcpDownstreamHandler { log.debug("[handleServiceInvoke][服务调用消息已发送] 设备地址: {}, 消息序号: {}", client.getDeviceAddr(), mid); - } catch (Exception e) { log.error("[handleServiceInvoke][服务调用失败]", e); } @@ -191,7 +189,6 @@ public class IotTcpDownstreamHandler { log.debug("[handleConfigPush][配置推送消息已发送] 设备地址: {}, 消息序号: {}", client.getDeviceAddr(), mid); - } catch (Exception e) { log.error("[handleConfigPush][配置推送失败]", e); } @@ -262,6 +259,7 @@ public class IotTcpDownstreamHandler { } } + // TODO @haohao:用不到的,要不暂时不提供; /** * 批量发送下行消息 * @@ -287,7 +285,6 @@ public class IotTcpDownstreamHandler { // 处理单个设备消息 handle(copyMessage); } - } catch (Exception e) { log.error("[broadcastMessage][批量发送消息失败]", e); } @@ -341,7 +338,6 @@ public class IotTcpDownstreamHandler { log.debug("[{}][消息已发送] 设备地址: {}, 消息序号: {}", methodName, client.getDeviceAddr(), dataPackage.getMid()); - } catch (Exception e) { log.warn("[{}][使用编解码器编码失败,降级使用原始编码] 错误: {}", methodName, e.getMessage()); @@ -353,6 +349,7 @@ public class IotTcpDownstreamHandler { } } + // TODO @haohao:看看这个要不要删除掉 /** * 获取连接统计信息 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java index 0067e72064..672de2ad2c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java @@ -39,6 +39,7 @@ public class IotTcpUpstreamHandler implements Handler { private final IotGatewayProperties.TcpProperties tcpConfig; + // TODO @haohao:可以把 TcpDeviceConnectionManager 能力放大一点:1)handle 里的 client 初始化,可以拿到 TcpDeviceConnectionManager 里;2)handleDeviceRegister 也是; private final TcpDeviceConnectionManager connectionManager; private final IotDeviceService deviceService; @@ -53,26 +54,25 @@ public class IotTcpUpstreamHandler implements Handler { public void handle(NetSocket socket) { log.info("[handle][收到设备连接: {}]", socket.remoteAddress()); - // 创建客户端ID和设备客户端 + // 创建客户端 ID 和设备客户端 + // TODO @haohao:clientid 给 TcpDeviceClient 生成会简洁一点;减少 upsteramhanlder 的非核心逻辑; String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress(); TcpDeviceClient client = new TcpDeviceClient(clientId, tcpConfig.getKeepAliveTimeoutMs()); try { // 设置连接异常和关闭处理 socket.exceptionHandler(ex -> { + // TODO @haohao:这里的日志,可能把 clientid 都打上?因为 address 会重复么? log.error("[handle][连接({})异常]", socket.remoteAddress(), ex); handleConnectionClose(client); }); - socket.closeHandler(v -> { log.info("[handle][连接({})关闭]", socket.remoteAddress()); handleConnectionClose(client); }); - - // 设置网络连接 client.setSocket(socket); - // 创建数据解析器 + // 设置解析器 RecordParser parser = TcpDataReader.createParser(buffer -> { try { handleDataPackage(client, buffer); @@ -80,13 +80,12 @@ public class IotTcpUpstreamHandler implements Handler { log.error("[handle][处理数据包异常]", e); } }); - - // 设置解析器 client.setParser(parser); + // TODO @haohao:socket.remoteAddress()) 打印进去 log.info("[handle][设备连接处理器初始化完成: {}]", clientId); - } catch (Exception e) { + // TODO @haohao:socket.remoteAddress()) 打印进去 log.error("[handle][初始化连接处理器失败]", e); client.shutdown(); } @@ -102,12 +101,12 @@ public class IotTcpUpstreamHandler implements Handler { try { // 解码数据包 TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer); - log.info("[handleDataPackage][接收数据包] 设备地址: {}, 功能码: {}, 消息序号: {}", dataPackage.getAddr(), dataPackage.getCodeDescription(), dataPackage.getMid()); // 根据功能码处理不同类型的消息 switch (dataPackage.getCode()) { + // TODO @haohao:【重要】code 要不要改成 opCode。这样和 data 里的 code 好区分; case TcpDataPackage.CODE_REGISTER: handleDeviceRegister(client, dataPackage); break; @@ -124,8 +123,8 @@ public class IotTcpUpstreamHandler implements Handler { log.warn("[handleDataPackage][未知功能码: {}]", dataPackage.getCode()); break; } - } catch (Exception e) { + // TODO @haohao:最好有 client 标识; log.error("[handleDataPackage][处理数据包失败]", e); } } @@ -140,7 +139,6 @@ public class IotTcpUpstreamHandler implements Handler { try { String deviceAddr = dataPackage.getAddr(); String productKey = dataPackage.getPayload(); - log.info("[handleDeviceRegister][设备注册] 设备地址: {}, 产品密钥: {}", deviceAddr, productKey); // 获取设备信息 @@ -152,6 +150,7 @@ public class IotTcpUpstreamHandler implements Handler { } // 更新客户端信息 + // TODO @haohao:一个 set 方法,统一处理掉会好点哈; client.setProductKey(productKey); client.setDeviceName(deviceAddr); client.setDeviceId(device.getId()); @@ -169,7 +168,6 @@ public class IotTcpUpstreamHandler implements Handler { sendRegisterReply(client, dataPackage, true); log.info("[handleDeviceRegister][设备注册成功] 设备地址: {}, 设备ID: {}", deviceAddr, device.getId()); - } catch (Exception e) { log.error("[handleDeviceRegister][设备注册失败]", e); sendRegisterReply(client, dataPackage, false); @@ -185,7 +183,6 @@ public class IotTcpUpstreamHandler implements Handler { private void handleHeartbeat(TcpDeviceClient client, TcpDataPackage dataPackage) { try { String deviceAddr = dataPackage.getAddr(); - log.debug("[handleHeartbeat][收到心跳] 设备地址: {}", deviceAddr); // 更新心跳时间 @@ -230,7 +227,6 @@ public class IotTcpUpstreamHandler implements Handler { // 3. 发送解码后的消息 messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); - } catch (Exception e) { log.warn("[handleDataUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage()); @@ -242,7 +238,6 @@ public class IotTcpUpstreamHandler implements Handler { // 发送数据上报回复 sendDataUpReply(client, dataPackage); - } catch (Exception e) { log.error("[handleDataUp][处理数据上报失败]", e); } @@ -279,11 +274,11 @@ public class IotTcpUpstreamHandler implements Handler { // 3. 发送解码后的消息 messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); - } catch (Exception e) { log.warn("[handleEventUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage()); // 降级处理:使用原始方式解析数据 + // TODO @芋艿:降级处理逻辑; JSONObject eventJson = JSONUtil.parseObj(payload); IotDeviceMessage message = IotDeviceMessage.requestOf("thing.event.post", eventJson); messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId); @@ -291,7 +286,6 @@ public class IotTcpUpstreamHandler implements Handler { // 发送事件上报回复 sendEventUpReply(client, dataPackage); - } catch (Exception e) { log.error("[handleEventUp][处理事件上报失败]", e); } @@ -329,13 +323,13 @@ public class IotTcpUpstreamHandler implements Handler { .addr(dataPackage.getAddr()) .code(TcpDataPackage.CODE_DATA_UP) .mid(dataPackage.getMid()) - .payload("0") // 0表示成功 + .payload("0") // 0 表示成功 TODO @haohao:最好枚举到 TcpDataPackage 里? .build(); io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage); client.sendMessage(replyBuffer); - } catch (Exception e) { + // TODO @haohao:可以有个 client id log.error("[sendDataUpReply][发送数据上报回复失败]", e); } } @@ -352,12 +346,11 @@ public class IotTcpUpstreamHandler implements Handler { .addr(dataPackage.getAddr()) .code(TcpDataPackage.CODE_EVENT_UP) .mid(dataPackage.getMid()) - .payload("0") // 0表示成功 + .payload("0") // 0 表示成功 .build(); io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage); client.sendMessage(replyBuffer); - } catch (Exception e) { log.error("[sendEventUpReply][发送事件上报回复失败]", e); } @@ -385,7 +378,6 @@ public class IotTcpUpstreamHandler implements Handler { } log.info("[handleConnectionClose][处理连接关闭完成] 设备地址: {}", deviceAddr); - } catch (Exception e) { log.error("[handleConnectionClose][处理连接关闭失败]", e); }