diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java
index b140e37e58..9ecaa8af6f 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java
@@ -8,86 +8,72 @@ import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import io.vertx.core.buffer.Buffer;
import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
-// TODO @haohao:【重要】是不是二进制更彻底哈?
-// 包头(4 字节)
-// 消息 ID string;nvarchar(length + string)
-// version(可选,不要干脆)
-// method string;nvarchar;为什么不要 opcode?因为 IotTcpJsonDeviceMessageCodec 里面,实际已经没 opcode 了
-// reply bit;0 请求,1 响应
-// 请求时:
-// params;nvarchar;json 处理
-// 响应时:
-// code
-// msg nvarchar
-// data;nvarchar;json 处理
+import java.nio.charset.StandardCharsets;
+
/**
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器
+ *
+ * 二进制协议格式(所有数值使用大端序):
*
- * 使用自定义二进制协议格式:包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
+ *
+ * +--------+--------+--------+--------+--------+--------+--------+--------+
+ * | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
+ * +--------+--------+--------+--------+--------+--------+--------+--------+
+ * | 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
+ * +--------+--------+--------+--------+--------+--------+--------+--------+
+ * | 方法名长度(2字节) | 方法名(变长字符串) |
+ * +--------+--------+--------+--------+--------+--------+--------+--------+
+ * | 消息体数据(变长) |
+ * +--------+--------+--------+--------+--------+--------+--------+--------+
+ *
+ *
+ * 消息体格式:
+ * - 请求消息:params 数据(JSON)
+ * - 响应消息:code (4字节) + msg 长度(2字节) + msg 字符串 + data 数据(JSON)
+ *
+ * 注意:deviceId 不包含在协议中,由服务器根据连接上下文自动设置
*
* @author 芋道源码
*/
+@Slf4j
@Component
public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
- // TODO @haohao:是不是叫 TCP_Binary 好点哈?
public static final String TYPE = "TCP_BINARY";
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- private static class TcpBinaryMessage {
+ // ==================== 协议常量 ====================
- /**
- * 功能码
- */
- private Short code;
+ /**
+ * 协议魔术字,用于协议识别
+ */
+ private static final byte MAGIC_NUMBER = (byte) 0x7E;
- // TODO @haohao:这个和 AlinkMessage 里面,是一个东西哇?
- /**
- * 消息序号
- */
- private Short mid;
-
- // TODO @haohao:这个字段,是不是没用到呀?感觉应该也不在消息列哈?
- /**
- * 设备 ID
- */
- private Long deviceId;
-
- /**
- * 请求方法
- */
- private String method;
-
- /**
- * 请求参数
- */
- private Object params;
-
- /**
- * 响应结果
- */
- private Object data;
-
- // TODO @haohao:这个可以改成 code 哇?更好理解一点;
- /**
- * 响应错误码
- */
- private Integer responseCode;
-
- /**
- * 响应提示
- */
- private String msg;
-
- // TODO @haohao:TcpBinaryMessage 和 TcpJsonMessage 保持一致哈?
+ /**
+ * 协议版本号
+ */
+ private static final byte PROTOCOL_VERSION = (byte) 0x01;
+ /**
+ * 消息类型常量
+ */
+ public static class MessageType {
+ public static final byte REQUEST = 0x01; // 请求消息
+ public static final byte RESPONSE = 0x02; // 响应消息
}
+ /**
+ * 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息标志 + 消息长度)
+ */
+ private static final int HEADER_FIXED_LENGTH = 8;
+
+ /**
+ * 最小消息长度(头部 + 消息ID长度 + 方法名长度)
+ */
+ private static final int MIN_MESSAGE_LENGTH = HEADER_FIXED_LENGTH + 4;
+
@Override
public String type() {
return TYPE;
@@ -99,215 +85,270 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
Assert.notBlank(message.getMethod(), "消息方法不能为空");
try {
- // 1. 确定功能码
- short code = MessageMethod.STATE_ONLINE.equals(message.getMethod())
- ? TcpDataPackage.CODE_HEARTBEAT : TcpDataPackage.CODE_MESSAGE_UP;
+ // 1. 确定消息类型
+ byte messageType = determineMessageType(message);
- // 2. 构建负载数据
- String payload = buildPayload(message);
+ // 2. 构建消息体
+ byte[] bodyData = buildMessageBody(message, messageType);
- // 3. 构建 TCP 数据包
- // TODO @haohao:这个和 AlinkMessage.id 是不是一致的哈?
- short mid = (short) (System.currentTimeMillis() % Short.MAX_VALUE);
- TcpDataPackage dataPackage = new TcpDataPackage(code, mid, payload);
+ // 3. 构建完整消息(不包含deviceId,由连接上下文管理)
+ return buildCompleteMessage(message, messageType, bodyData);
- // 4. 编码为字节流
- return encodeTcpDataPackage(dataPackage).getBytes();
} catch (Exception e) {
- throw new TcpCodecException("TCP 消息编码失败", e);
+ log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
+ throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e);
}
}
@Override
public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
- Assert.isTrue(bytes.length > 0, "待解码数据不能为空");
+ Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
try {
- // 1. 解码 TCP 数据包
- TcpDataPackage dataPackage = decodeTcpDataPackage(Buffer.buffer(bytes));
+ Buffer buffer = Buffer.buffer(bytes);
- // 2. 根据功能码确定方法
- String method = (dataPackage.getCode() == TcpDataPackage.CODE_HEARTBEAT) ? MessageMethod.STATE_ONLINE
- : MessageMethod.PROPERTY_POST;
+ // 1. 解析协议头部
+ ProtocolHeader header = parseProtocolHeader(buffer);
- // 3. 解析负载数据
- PayloadInfo payloadInfo = parsePayloadInfo(dataPackage.getPayload());
+ // 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
+ return parseMessageContent(buffer, header);
- // 4. 构建 IoT 设备消息
- return IotDeviceMessage.of(
- payloadInfo.getRequestId(),
- method,
- payloadInfo.getParams(),
- null,
- null,
- null);
} catch (Exception e) {
- throw new TcpCodecException("TCP 消息解码失败", e);
+ log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
+ throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
}
}
- // ==================== 内部辅助方法 ====================
+ // ==================== 编码相关方法 ====================
/**
- * 构建负载数据
- *
- * @param message 设备消息
- * @return 负载字符串
+ * 确定消息类型
+ * 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息
*/
- private String buildPayload(IotDeviceMessage message) {
- TcpBinaryMessage tcpBinaryMessage = new TcpBinaryMessage(
- null, // code 在数据包中单独处理
- null, // mid 在数据包中单独处理
- message.getDeviceId(),
- message.getMethod(),
- message.getParams(),
- message.getData(),
- message.getCode(),
- message.getMsg());
- return JsonUtils.toJsonString(tcpBinaryMessage);
+ private byte determineMessageType(IotDeviceMessage message) {
+ // 判断是否为响应消息:有响应码或响应消息时为响应
+ if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
+ return MessageType.RESPONSE;
+ }
+ // 默认为请求消息
+ return MessageType.REQUEST;
}
/**
- * 解析负载信息
- *
- * @param payload 负载字符串
- * @return 负载信息
+ * 构建消息体
*/
- private PayloadInfo parsePayloadInfo(String payload) {
- if (StrUtil.isBlank(payload)) {
- return new PayloadInfo(null, null);
- }
+ private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
+ Buffer bodyBuffer = Buffer.buffer();
- try {
- TcpBinaryMessage tcpBinaryMessage = JsonUtils.parseObject(payload, TcpBinaryMessage.class);
- if (tcpBinaryMessage != null) {
- return new PayloadInfo(
- StrUtil.isNotEmpty(tcpBinaryMessage.getMethod())
- ? tcpBinaryMessage.getMethod() + "_" + System.currentTimeMillis()
- : null,
- tcpBinaryMessage.getParams());
+ if (messageType == MessageType.RESPONSE) {
+ // 响应消息:code + msg长度 + msg + data
+ bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
+
+ String msg = message.getMsg() != null ? message.getMsg() : "";
+ byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+ bodyBuffer.appendShort((short) msgBytes.length);
+ bodyBuffer.appendBytes(msgBytes);
+
+ if (message.getData() != null) {
+ bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
+ }
+ } else {
+ // 请求消息:包含 params 或 data
+ Object payload = message.getParams() != null ? message.getParams() : message.getData();
+ if (payload != null) {
+ bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
}
- } catch (Exception e) {
- // 如果解析失败,返回默认值
- return new PayloadInfo("unknown_" + System.currentTimeMillis(), null);
}
- return null;
+
+ return bodyBuffer.getBytes();
}
/**
- * 编码 TCP 数据包
- *
- * @param dataPackage 数据包对象
- * @return 编码后的字节流
+ * 构建完整消息
*/
- private Buffer encodeTcpDataPackage(TcpDataPackage dataPackage) {
- Assert.notNull(dataPackage, "数据包对象不能为空");
- Assert.notNull(dataPackage.getPayload(), "负载不能为空");
-
- // 1. 计算包体长度(除了包头 4 字节)
- int payloadLength = dataPackage.getPayload().getBytes().length;
- int totalLength = 2 + 2 + payloadLength;
-
- // 2. 写入数据
+ private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
Buffer buffer = Buffer.buffer();
- // 2.1 写入包头:总长度(4 字节)
- buffer.appendInt(totalLength);
- // 2.2 写入功能码(2 字节)
- buffer.appendShort(dataPackage.getCode());
- // 2.3 写入消息序号(2 字节)
- buffer.appendShort(dataPackage.getMid());
- // 2.4 写入包体数据(不定长)
- buffer.appendBytes(dataPackage.getPayload().getBytes());
- return buffer;
+
+ // 1. 写入协议头部
+ buffer.appendByte(MAGIC_NUMBER);
+ buffer.appendByte(PROTOCOL_VERSION);
+ buffer.appendByte(messageType);
+ buffer.appendByte((byte) 0x00); // 消息标志,预留字段
+
+ // 2. 预留消息长度位置
+ int lengthPosition = buffer.length();
+ buffer.appendInt(0);
+
+ // 3. 写入消息ID
+ String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
+ : generateMessageId(message.getMethod());
+ byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
+ buffer.appendShort((short) messageIdBytes.length);
+ buffer.appendBytes(messageIdBytes);
+
+ // 4. 写入方法名
+ byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
+ buffer.appendShort((short) methodBytes.length);
+ buffer.appendBytes(methodBytes);
+
+ // 5. 写入消息体
+ buffer.appendBytes(bodyData);
+
+ // 6. 更新消息长度
+ buffer.setInt(lengthPosition, buffer.length());
+
+ return buffer.getBytes();
}
/**
- * 解码 TCP 数据包
- *
- * @param buffer 数据缓冲区
- * @return 解码后的数据包
+ * 生成消息 ID
*/
- private TcpDataPackage decodeTcpDataPackage(Buffer buffer) {
- Assert.isTrue(buffer.length() >= 8, "数据包长度不足");
+ private String generateMessageId(String method) {
+ return method + "_" + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
+ }
+ // ==================== 解码相关方法 ====================
+
+ /**
+ * 解析协议头部
+ */
+ private ProtocolHeader parseProtocolHeader(Buffer buffer) {
int index = 0;
- // 1. 跳过包头(4 字节)
+
+ // 1. 验证魔术字
+ byte magic = buffer.getByte(index++);
+ Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
+
+ // 2. 验证版本号
+ byte version = buffer.getByte(index++);
+ Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
+
+ // 3. 读取消息类型
+ byte messageType = buffer.getByte(index++);
+ Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
+
+ // 4. 读取消息标志(暂时跳过)
+ byte messageFlags = buffer.getByte(index++);
+
+ // 5. 读取消息长度
+ int messageLength = buffer.getInt(index);
index += 4;
- // 2. 获取功能码(2 字节)
- short code = buffer.getShort(index);
+
+ Assert.isTrue(messageLength == buffer.length(), "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
+
+ return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
+ }
+
+ /**
+ * 解析消息内容
+ */
+ private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
+ int index = header.getNextIndex();
+
+ // 1. 读取消息ID
+ short messageIdLength = buffer.getShort(index);
index += 2;
- // 3. 获取消息序号(2 字节)
- short mid = buffer.getShort(index);
+ String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
+ index += messageIdLength;
+
+ // 2. 读取方法名
+ short methodLength = buffer.getShort(index);
index += 2;
- // 4. 获取包体数据
- String payload = "";
- if (index < buffer.length()) {
- payload = buffer.getString(index, buffer.length());
+ String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
+ index += methodLength;
+
+ // 3. 解析消息体
+ return parseMessageBody(buffer, index, header.getMessageType(), messageId, method);
+ }
+
+ /**
+ * 解析消息体
+ */
+ private IotDeviceMessage parseMessageBody(Buffer buffer, int startIndex, byte messageType,
+ String messageId, String method) {
+ if (startIndex >= buffer.length()) {
+ // 空消息体
+ return IotDeviceMessage.of(messageId, method, null, null, null, null);
}
- return new TcpDataPackage(code, mid, payload);
+ if (messageType == MessageType.RESPONSE) {
+ // 响应消息:解析 code + msg + data
+ return parseResponseMessage(buffer, startIndex, messageId, method);
+ } else {
+ // 请求消息:解析 payload(可能是 params 或 data)
+ Object payload = parseJsonData(buffer, startIndex, buffer.length());
+ return IotDeviceMessage.of(messageId, method, payload, null, null, null);
+ }
+ }
+
+ /**
+ * 解析响应消息
+ */
+ private IotDeviceMessage parseResponseMessage(Buffer buffer, int startIndex, String messageId, String method) {
+ int index = startIndex;
+
+ // 1. 读取响应码
+ Integer code = buffer.getInt(index);
+ index += 4;
+
+ // 2. 读取响应消息
+ short msgLength = buffer.getShort(index);
+ index += 2;
+ String msg = msgLength > 0 ? buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()) : null;
+ index += msgLength;
+
+ // 3. 读取响应数据
+ Object data = null;
+ if (index < buffer.length()) {
+ data = parseJsonData(buffer, index, buffer.length());
+ }
+
+ return IotDeviceMessage.of(messageId, method, null, data, code, msg);
+ }
+
+ /**
+ * 解析JSON数据
+ */
+ private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
+ if (startIndex >= endIndex) {
+ return null;
+ }
+
+ try {
+ String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
+ if (StrUtil.isBlank(jsonStr)) {
+ return null;
+ }
+ return JsonUtils.parseObject(jsonStr, Object.class);
+ } catch (Exception e) {
+ log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
+ return buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
+ }
+ }
+
+ // ==================== 辅助方法 ====================
+
+ /**
+ * 验证消息类型是否有效
+ */
+ private boolean isValidMessageType(byte messageType) {
+ return messageType == MessageType.REQUEST || messageType == MessageType.RESPONSE;
}
// ==================== 内部类 ====================
- // TODO @haohao:会不会存在 reply 的时候,有 data、msg、code 参数哈。
/**
- * 负载信息类
+ * 协议头部信息
*/
@Data
@AllArgsConstructor
- private static class PayloadInfo {
-
- private String requestId;
- private Object params;
-
+ private static class ProtocolHeader {
+ private byte magic;
+ private byte version;
+ private byte messageType;
+ private byte messageFlags;
+ private int messageLength;
+ private int nextIndex; // 指向消息内容开始位置
}
-
- /**
- * TCP 数据包内部类
- */
- @Data
- @AllArgsConstructor
- private static class TcpDataPackage {
-
- // 功能码定义
- public static final short CODE_REGISTER = 10;
- public static final short CODE_REGISTER_REPLY = 11;
- public static final short CODE_HEARTBEAT = 20;
- public static final short CODE_HEARTBEAT_REPLY = 21;
- public static final short CODE_MESSAGE_UP = 30;
- public static final short CODE_MESSAGE_DOWN = 40;
-
- // TODO @haohao:要不改成 opCode
- private short code;
- private short mid;
- private String payload;
-
- }
-
- // ==================== 常量定义 ====================
-
- /**
- * 消息方法常量
- */
- public static class MessageMethod {
-
- public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
- public static final String STATE_ONLINE = "thing.state.online"; // 心跳
-
- }
-
- // ==================== 自定义异常 ====================
-
- // TODO @haohao:全局异常搞个。看着可以服用哈。
- /**
- * TCP 编解码异常
- */
- public static class TcpCodecException extends RuntimeException {
- public TcpCodecException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java
index 1bbda950b7..e4ff2f50bc 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java
@@ -14,12 +14,13 @@ import org.springframework.stereotype.Component;
*
* 采用纯 JSON 格式传输,格式如下:
* {
- * "id": "消息 ID",
- * "method": "消息方法",
- * "deviceId": "设备 ID",
- * "params": {...},
- * "timestamp": 时间戳
- * // TODO @haohao:貌似少了 code、msg、timestamp
+ * "id": "消息 ID",
+ * "method": "消息方法",
+ * "params": {...}, // 请求参数
+ * "data": {...}, // 响应结果
+ * "code": 200, // 响应错误码
+ * "msg": "success", // 响应提示
+ * "timestamp": 时间戳
* }
*
* @author 芋道源码
@@ -44,12 +45,6 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private String method;
- // TODO @haohao:这个字段,是不是没用到呀?感觉应该也不在消息列哈?
- /**
- * 设备 ID
- */
- private Long deviceId;
-
/**
* 请求参数
*/
@@ -84,9 +79,13 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
@Override
public byte[] encode(IotDeviceMessage message) {
- TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(message.getRequestId(), message.getMethod(),
- message.getDeviceId(),
- message.getParams(), message.getData(), message.getCode(), message.getMsg(),
+ TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(
+ message.getRequestId(),
+ message.getMethod(),
+ message.getParams(),
+ message.getData(),
+ message.getCode(),
+ message.getMsg(),
System.currentTimeMillis());
return JsonUtils.toJsonByte(tcpJsonMessage);
}
@@ -97,11 +96,13 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(bytes, TcpJsonMessage.class);
Assert.notNull(tcpJsonMessage, "消息不能为空");
Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空");
- // TODO @haohao:这个我已经改了哈。一些属性,可以放在一行,好理解一点~
- IotDeviceMessage iotDeviceMessage = IotDeviceMessage.of(tcpJsonMessage.getId(), tcpJsonMessage.getMethod(),
- tcpJsonMessage.getParams(), tcpJsonMessage.getData(), tcpJsonMessage.getCode(), tcpJsonMessage.getMsg());
- iotDeviceMessage.setDeviceId(tcpJsonMessage.getDeviceId());
- return iotDeviceMessage;
+ return IotDeviceMessage.of(
+ tcpJsonMessage.getId(),
+ tcpJsonMessage.getMethod(),
+ tcpJsonMessage.getParams(),
+ tcpJsonMessage.getData(),
+ tcpJsonMessage.getCode(),
+ tcpJsonMessage.getMsg());
}
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java
index 72fc0eef50..51af9bd3ce 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java
@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
-import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
+import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -92,19 +92,19 @@ public class IotGatewayConfiguration {
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
- IotTcpSessionManager sessionManager,
+ IotTcpConnectionManager connectionManager,
Vertx tcpVertx) {
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
- deviceService, messageService, sessionManager, tcpVertx);
+ deviceService, messageService, connectionManager, tcpVertx);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotDeviceService deviceService,
- IotTcpSessionManager sessionManager,
+ IotTcpConnectionManager connectionManager,
IotMessageBus messageBus) {
- return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
+ return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, connectionManager,
messageBus);
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java
index 6130caa851..e4d46b3af6 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java
@@ -4,13 +4,13 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
+import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
/**
* IoT 网关 TCP 下游订阅者:接收下行给设备的消息
@@ -18,37 +18,28 @@ import org.springframework.stereotype.Component;
* @author 芋道源码
*/
@Slf4j
-@Component
+@RequiredArgsConstructor
public class IotTcpDownstreamSubscriber implements IotMessageSubscriber {
- private final IotTcpDownstreamHandler downstreamHandler;
-
- private final IotMessageBus messageBus;
-
private final IotTcpUpstreamProtocol protocol;
- // todo @haohao:不用的变量,可以去掉哈
+ private final IotDeviceMessageService messageService;
+
private final IotDeviceService deviceService;
- private final IotTcpSessionManager sessionManager;
+ private final IotTcpConnectionManager connectionManager;
- // TODO @haohao:lombok 简化
- public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
- IotDeviceMessageService messageService,
- IotDeviceService deviceService,
- IotTcpSessionManager sessionManager,
- IotMessageBus messageBus) {
- this.protocol = protocol;
- this.messageBus = messageBus;
- this.deviceService = deviceService;
- this.sessionManager = sessionManager;
- this.downstreamHandler = new IotTcpDownstreamHandler(messageService, deviceService, sessionManager);
- }
+ private final IotMessageBus messageBus;
+
+ private IotTcpDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
+ // 初始化下游处理器
+ this.downstreamHandler = new IotTcpDownstreamHandler(messageService, deviceService, connectionManager);
+
messageBus.register(this);
- log.info("[init][TCP 下游订阅者初始化完成] 服务器 ID: {}, Topic: {}",
+ log.info("[init][TCP 下游订阅者初始化完成,服务器 ID: {},Topic: {}]",
protocol.getServerId(), getTopic());
}
@@ -68,8 +59,9 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber {
+ tcpServer = vertx.createNetServer(options);
+ tcpServer.connectHandler(socket -> {
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
- sessionManager);
+ connectionManager);
handler.handle(socket);
});
// 启动服务器
try {
- netServer.listen().result();
+ tcpServer.listen().result();
log.info("[start][IoT 网关 TCP 协议启动成功,端口:{}]", tcpProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 TCP 协议启动失败]", e);
@@ -87,9 +87,9 @@ public class IotTcpUpstreamProtocol {
@PreDestroy
public void stop() {
- if (netServer != null) {
+ if (tcpServer != null) {
try {
- netServer.close().result();
+ tcpServer.close().result();
log.info("[stop][IoT 网关 TCP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 TCP 协议停止失败]", e);
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpAuthManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpAuthManager.java
deleted file mode 100644
index 8a67e587a8..0000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpAuthManager.java
+++ /dev/null
@@ -1,202 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
-
-import io.vertx.core.net.NetSocket;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * IoT 网关 TCP 认证信息管理器
- *
- * 维护 TCP 连接的认证状态,支持认证信息的存储、查询和清理
- *
- * @author 芋道源码
- */
-@Slf4j
-@Component
-public class IotTcpAuthManager {
-
- /**
- * 连接认证状态映射:NetSocket -> 认证信息
- */
- private final Map authStatusMap = new ConcurrentHashMap<>();
-
- // TODO @haohao:得考虑,一个设备连接多次?
- /**
- * 设备 ID -> NetSocket 的映射(用于快速查找)
- */
- private final Map deviceSocketMap = new ConcurrentHashMap<>();
-
- /**
- * 注册认证信息
- *
- * @param socket TCP 连接
- * @param authInfo 认证信息
- */
- public void registerAuth(NetSocket socket, AuthInfo authInfo) {
- // 如果设备已有其他连接,先清理旧连接
- // TODO @haohao:是不是允许同时连接?就像 mqtt 应该也允许重复连接哈?
- NetSocket oldSocket = deviceSocketMap.get(authInfo.getDeviceId());
- if (oldSocket != null && oldSocket != socket) {
- log.info("[registerAuth][设备已有其他连接,清理旧连接] 设备 ID: {}, 旧连接: {}",
- authInfo.getDeviceId(), oldSocket.remoteAddress());
- authStatusMap.remove(oldSocket);
- }
-
- // 注册新认证信息
- authStatusMap.put(socket, authInfo);
- deviceSocketMap.put(authInfo.getDeviceId(), socket);
-
- log.info("[registerAuth][注册认证信息] 设备 ID: {}, 连接: {}, productKey: {}, deviceName: {}",
- authInfo.getDeviceId(), socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
- }
-
- /**
- * 注销认证信息
- *
- * @param socket TCP 连接
- */
- public void unregisterAuth(NetSocket socket) {
- AuthInfo authInfo = authStatusMap.remove(socket);
- if (authInfo != null) {
- deviceSocketMap.remove(authInfo.getDeviceId());
- log.info("[unregisterAuth][注销认证信息] 设备 ID: {}, 连接: {}",
- authInfo.getDeviceId(), socket.remoteAddress());
- }
- }
-
- // TODO @haohao:建议暂时没用的方法,可以删除掉;整体聚焦!
- /**
- * 注销设备认证信息
- *
- * @param deviceId 设备 ID
- */
- public void unregisterAuth(Long deviceId) {
- NetSocket socket = deviceSocketMap.remove(deviceId);
- if (socket != null) {
- AuthInfo authInfo = authStatusMap.remove(socket);
- if (authInfo != null) {
- log.info("[unregisterAuth][注销设备认证信息] 设备 ID: {}, 连接: {}",
- deviceId, socket.remoteAddress());
- }
- }
- }
-
- /**
- * 获取认证信息
- *
- * @param socket TCP 连接
- * @return 认证信息,如果未认证则返回 null
- */
- public AuthInfo getAuthInfo(NetSocket socket) {
- return authStatusMap.get(socket);
- }
-
- /**
- * 获取设备的认证信息
- *
- * @param deviceId 设备 ID
- * @return 认证信息,如果设备未认证则返回 null
- */
- public AuthInfo getAuthInfo(Long deviceId) {
- NetSocket socket = deviceSocketMap.get(deviceId);
- return socket != null ? authStatusMap.get(socket) : null;
- }
-
- /**
- * 检查连接是否已认证
- *
- * @param socket TCP 连接
- * @return 是否已认证
- */
- public boolean isAuthenticated(NetSocket socket) {
- return authStatusMap.containsKey(socket);
- }
-
- /**
- * 检查设备是否已认证
- *
- * @param deviceId 设备 ID
- * @return 是否已认证
- */
- public boolean isAuthenticated(Long deviceId) {
- return deviceSocketMap.containsKey(deviceId);
- }
-
- /**
- * 获取设备的 TCP 连接
- *
- * @param deviceId 设备 ID
- * @return TCP 连接,如果设备未认证则返回 null
- */
- public NetSocket getDeviceSocket(Long deviceId) {
- return deviceSocketMap.get(deviceId);
- }
-
- /**
- * 获取当前已认证设备数量
- *
- * @return 已认证设备数量
- */
- public int getAuthenticatedDeviceCount() {
- return deviceSocketMap.size();
- }
-
- /**
- * 获取所有已认证设备 ID
- *
- * @return 已认证设备 ID 集合
- */
- public java.util.Set getAuthenticatedDeviceIds() {
- return deviceSocketMap.keySet();
- }
-
- /**
- * 清理所有认证信息
- */
- public void clearAll() {
- int count = authStatusMap.size();
- authStatusMap.clear();
- deviceSocketMap.clear();
- // TODO @haohao:第一个括号是方法,第二个括号是明细日志;其它日志,也可以检查下哈。
- log.info("[clearAll][清理所有认证信息] 清理数量: {}", count);
- }
-
- /**
- * 认证信息
- */
- @Data
- public static class AuthInfo {
-
- /**
- * 设备编号
- */
- private Long deviceId;
-
- /**
- * 产品标识
- */
- private String productKey;
-
- /**
- * 设备名称
- */
- private String deviceName;
-
- // TODO @haohao:令牌不要存储,万一有安全问题哈;
- /**
- * 认证令牌
- */
- private String token;
-
- /**
- * 客户端 ID
- */
- private String clientId;
-
- }
-
-}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
new file mode 100644
index 0000000000..3ab7470005
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
@@ -0,0 +1,185 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
+
+import io.vertx.core.net.NetSocket;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * IoT 网关 TCP 连接管理器
+ *
+ * 统一管理 TCP 连接的认证状态、设备会话和消息发送功能:
+ * 1. 管理 TCP 连接的认证状态
+ * 2. 管理设备会话和在线状态
+ * 3. 管理消息发送到设备
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@Component
+public class IotTcpConnectionManager {
+
+ /**
+ * 连接信息映射:NetSocket -> 连接信息
+ */
+ private final Map connectionMap = new ConcurrentHashMap<>();
+
+ /**
+ * 设备 ID -> NetSocket 的映射(用于快速查找)
+ */
+ private final Map deviceSocketMap = new ConcurrentHashMap<>();
+
+ /**
+ * NetSocket -> 设备 ID 的映射(用于连接断开时清理)
+ */
+ private final Map socketDeviceMap = new ConcurrentHashMap<>();
+
+ /**
+ * 注册设备连接(包含认证信息)
+ *
+ * @param socket TCP 连接
+ * @param deviceId 设备 ID
+ * @param authInfo 认证信息
+ */
+ public void registerConnection(NetSocket socket, Long deviceId, AuthInfo authInfo) {
+ // 如果设备已有其他连接,先清理旧连接
+ NetSocket oldSocket = deviceSocketMap.get(deviceId);
+ if (oldSocket != null && oldSocket != socket) {
+ log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
+ deviceId, oldSocket.remoteAddress());
+ oldSocket.close();
+ // 清理所有相关映射
+ connectionMap.remove(oldSocket);
+ socketDeviceMap.remove(oldSocket);
+ }
+
+ // 注册新连接 - 更新所有映射关系
+ ConnectionInfo connectionInfo = new ConnectionInfo()
+ .setDeviceId(deviceId)
+ .setAuthInfo(authInfo)
+ .setAuthenticated(true);
+
+ connectionMap.put(socket, connectionInfo);
+ deviceSocketMap.put(deviceId, socket);
+ socketDeviceMap.put(socket, deviceId);
+
+ log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
+ deviceId, socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
+ }
+
+ /**
+ * 注销设备连接
+ *
+ * @param socket TCP 连接
+ */
+ public void unregisterConnection(NetSocket socket) {
+ ConnectionInfo connectionInfo = connectionMap.remove(socket);
+ Long deviceId = socketDeviceMap.remove(socket);
+
+ if (connectionInfo != null && deviceId != null) {
+ deviceSocketMap.remove(deviceId);
+ log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
+ deviceId, socket.remoteAddress());
+ }
+ }
+
+ /**
+ * 注销设备连接(通过设备 ID)
+ *
+ * @param deviceId 设备 ID
+ */
+ public void unregisterConnection(Long deviceId) {
+ NetSocket socket = deviceSocketMap.remove(deviceId);
+ if (socket != null) {
+ connectionMap.remove(socket);
+ socketDeviceMap.remove(socket);
+ log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress());
+ }
+ }
+
+ /**
+ * 检查连接是否已认证
+ */
+ public boolean isAuthenticated(NetSocket socket) {
+ ConnectionInfo info = connectionMap.get(socket);
+ return info != null && info.isAuthenticated();
+ }
+
+ /**
+ * 检查连接是否未认证
+ */
+ public boolean isNotAuthenticated(NetSocket socket) {
+ return !isAuthenticated(socket);
+ }
+
+ /**
+ * 获取连接的认证信息
+ */
+ public AuthInfo getAuthInfo(NetSocket socket) {
+ ConnectionInfo info = connectionMap.get(socket);
+ return info != null ? info.getAuthInfo() : null;
+ }
+
+ /**
+ * 检查设备是否在线
+ */
+ public boolean isDeviceOnline(Long deviceId) {
+ return deviceSocketMap.containsKey(deviceId);
+ }
+
+ /**
+ * 检查设备是否离线
+ */
+ public boolean isDeviceOffline(Long deviceId) {
+ return !isDeviceOnline(deviceId);
+ }
+
+ /**
+ * 发送消息到设备
+ */
+ public boolean sendToDevice(Long deviceId, byte[] data) {
+ NetSocket socket = deviceSocketMap.get(deviceId);
+ if (socket == null) {
+ log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId);
+ return false;
+ }
+
+ try {
+ socket.write(io.vertx.core.buffer.Buffer.buffer(data));
+ log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length);
+ return true;
+ } catch (Exception e) {
+ log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e);
+ // 发送失败时清理连接
+ unregisterConnection(socket);
+ return false;
+ }
+ }
+
+ /**
+ * 连接信息
+ */
+ @Data
+ @Accessors(chain = true)
+ public static class ConnectionInfo {
+ private Long deviceId;
+ private AuthInfo authInfo;
+ private boolean authenticated;
+ }
+
+ /**
+ * 认证信息
+ */
+ @Data
+ @Accessors(chain = true)
+ public static class AuthInfo {
+ private Long deviceId;
+ private String productKey;
+ private String deviceName;
+ private String clientId;
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpSessionManager.java
deleted file mode 100644
index 00685e5cf6..0000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpSessionManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
-
-import io.vertx.core.net.NetSocket;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-// TODO @haohao:IotTcpSessionManager、IotTcpAuthManager 是不是融合哈?
-/**
- * IoT 网关 TCP 会话管理器
- *
- * 维护设备 ID 和 TCP 连接的映射关系,支持下行消息发送
- *
- * @author 芋道源码
- */
-@Slf4j
-@Component
-public class IotTcpSessionManager {
-
- /**
- * 设备 ID -> TCP 连接的映射
- */
- private final Map deviceSocketMap = new ConcurrentHashMap<>();
-
- /**
- * TCP 连接 -> 设备 ID 的映射(用于连接断开时清理)
- */
- private final Map socketDeviceMap = new ConcurrentHashMap<>();
-
- /**
- * 注册设备会话
- *
- * @param deviceId 设备 ID
- * @param socket TCP 连接
- */
- public void registerSession(Long deviceId, NetSocket socket) {
- // 如果设备已有连接,先断开旧连接
- NetSocket oldSocket = deviceSocketMap.get(deviceId);
- if (oldSocket != null && oldSocket != socket) {
- log.info("[registerSession][设备已有连接,断开旧连接] 设备 ID: {}, 旧连接: {}", deviceId, oldSocket.remoteAddress());
- oldSocket.close();
- socketDeviceMap.remove(oldSocket);
- }
-
- // 注册新连接
- deviceSocketMap.put(deviceId, socket);
- socketDeviceMap.put(socket, deviceId);
-
- log.info("[registerSession][注册设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
- }
-
- /**
- * 注销设备会话
- *
- * @param deviceId 设备 ID
- */
- public void unregisterSession(Long deviceId) {
- NetSocket socket = deviceSocketMap.remove(deviceId);
- if (socket != null) {
- socketDeviceMap.remove(socket);
- log.info("[unregisterSession][注销设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
- }
- }
-
- /**
- * 注销 TCP 连接会话
- *
- * @param socket TCP 连接
- */
- public void unregisterSession(NetSocket socket) {
- Long deviceId = socketDeviceMap.remove(socket);
- if (deviceId != null) {
- deviceSocketMap.remove(deviceId);
- log.info("[unregisterSession][注销连接会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
- }
- }
-
- /**
- * 获取设备的 TCP 连接
- *
- * @param deviceId 设备 ID
- * @return TCP 连接,如果设备未连接则返回 null
- */
- public NetSocket getDeviceSocket(Long deviceId) {
- return deviceSocketMap.get(deviceId);
- }
-
- /**
- * 检查设备是否在线
- *
- * @param deviceId 设备 ID
- * @return 是否在线
- */
- public boolean isDeviceOnline(Long deviceId) {
- NetSocket socket = deviceSocketMap.get(deviceId);
- return socket != null;
- }
-
- /**
- * 发送消息到设备
- *
- * @param deviceId 设备 ID
- * @param data 消息数据
- * @return 是否发送成功
- */
- public boolean sendToDevice(Long deviceId, byte[] data) {
- NetSocket socket = deviceSocketMap.get(deviceId);
- if (socket == null) {
- log.warn("[sendToDevice][设备未连接] 设备 ID: {}", deviceId);
- return false;
- }
-
- try {
- socket.write(io.vertx.core.buffer.Buffer.buffer(data));
- log.debug("[sendToDevice][发送消息成功] 设备 ID: {}, 数据长度: {} 字节", deviceId, data.length);
- return true;
- } catch (Exception e) {
- log.error("[sendToDevice][发送消息失败] 设备 ID: {}", deviceId, e);
- // 发送失败时清理连接
- unregisterSession(deviceId);
- return false;
- }
- }
-
- /**
- * 获取当前在线设备数量
- *
- * @return 在线设备数量
- */
- public int getOnlineDeviceCount() {
- return deviceSocketMap.size();
- }
-
- /**
- * 获取所有在线设备 ID
- *
- * @return 在线设备 ID 集合
- */
- public java.util.Set getOnlineDeviceIds() {
- return deviceSocketMap.keySet();
- }
-}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java
index 05970ede13..fd352f3b44 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java
@@ -2,9 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
+import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -13,62 +14,50 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
+@RequiredArgsConstructor
public class IotTcpDownstreamHandler {
- private final IotDeviceMessageService messageService;
+ private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
- private final IotTcpSessionManager sessionManager;
-
- // TODO @haohao:这个可以使用 lombok 简化构造方法
- public IotTcpDownstreamHandler(IotDeviceMessageService messageService,
- IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
- this.messageService = messageService;
- this.deviceService = deviceService;
- this.sessionManager = sessionManager;
- }
+ private final IotTcpConnectionManager connectionManager;
/**
* 处理下行消息
- *
- * @param message 设备消息
*/
public void handle(IotDeviceMessage message) {
try {
- log.info("[handle][处理下行消息] 设备 ID: {}, 方法: {}, 消息 ID: {}",
+ log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
- // TODO @haohao 1. 和 2. 可以合成 1.1 1.2 并且中间可以不空行;
- // 1. 获取设备信息
- IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
- if (device == null) {
- log.error("[handle][设备不存在] 设备 ID: {}", message.getDeviceId());
+ // 1.1 获取设备信息
+ IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
+ if (deviceInfo == null) {
+ log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId());
+ return;
+ }
+ // 1.2 检查设备是否在线
+ if (connectionManager.isDeviceOffline(message.getDeviceId())) {
+ log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
return;
}
- // 2. 检查设备是否在线
- if (!sessionManager.isDeviceOnline(message.getDeviceId())) {
- log.warn("[handle][设备不在线] 设备 ID: {}", message.getDeviceId());
- return;
- }
+ // 2. 根据产品 Key 和设备名称编码消息并发送到设备
+ byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
+ deviceInfo.getDeviceName());
+ boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
- // 3. 编码消息
- byte[] bytes = messageService.encodeDeviceMessage(message, device.getCodecType());
-
- // 4. 发送消息到设备
- boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes);
if (success) {
- log.info("[handle][下行消息发送成功] 设备 ID: {}, 方法: {}, 消息 ID: {}, 数据长度: {} 字节",
+ log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
- log.error("[handle][下行消息发送失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
+ log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
- } // TODO @haohao:下面这个空行,可以考虑去掉的哈。
-
+ }
} catch (Exception e) {
- log.error("[handle][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息内容: {}",
- message.getDeviceId(), message.getMethod(), message.getParams(), e);
+ log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
+ message.getDeviceId(), message.getMethod(), message, e);
}
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
index 6acc235569..29cda53228 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
@@ -12,50 +12,48 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
+import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
+import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
-import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpAuthManager;
-import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
-import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
+import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import java.nio.charset.StandardCharsets;
+
/**
* TCP 上行消息处理器
+ *
+ * @author 芋道源码
*/
@Slf4j
public class IotTcpUpstreamHandler implements Handler {
- // TODO @haohao:这两个变量,可以复用 IotTcpBinaryDeviceMessageCodec 的 TYPE
- private static final String CODEC_TYPE_JSON = "TCP_JSON";
- private static final String CODEC_TYPE_BINARY = "TCP_BINARY";
-
+ private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
+ private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
- private final IotTcpSessionManager sessionManager;
-
- private final IotTcpAuthManager authManager;
-
- private final IotDeviceTokenService deviceTokenService;
+ private final IotTcpConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService,
- IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
+ IotDeviceService deviceService, IotTcpConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
- this.sessionManager = sessionManager;
- this.authManager = SpringUtil.getBean(IotTcpAuthManager.class);
- this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
+ this.connectionManager = connectionManager;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
}
@@ -63,207 +61,313 @@ public class IotTcpUpstreamHandler implements Handler {
@Override
public void handle(NetSocket socket) {
String clientId = IdUtil.simpleUUID();
- log.info("[handle][收到设备连接] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
+ log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
// 设置异常和关闭处理器
socket.exceptionHandler(ex -> {
- log.error("[handle][连接异常] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress(), ex);
- cleanupSession(socket);
+ log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
+ cleanupConnection(socket);
});
socket.closeHandler(v -> {
- log.info("[handle][连接关闭] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
- cleanupSession(socket);
+ log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
+ cleanupConnection(socket);
});
- socket.handler(buffer -> handleDataPackage(clientId, buffer, socket));
+ socket.handler(buffer -> processMessage(clientId, buffer, socket));
}
- private void handleDataPackage(String clientId, Buffer buffer, NetSocket socket) {
+ /**
+ * 处理消息
+ */
+ private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
try {
+ // 1. 数据包基础检查
if (buffer.length() == 0) {
- log.warn("[handleDataPackage][数据包为空] 客户端 ID: {}", clientId);
return;
}
- // 1. 解码消息
+ // 2. 解码消息
MessageInfo messageInfo = decodeMessage(buffer);
if (messageInfo == null) {
return;
}
- // TODO @haohao:2. 和 3. 可以合并成 2.1 2.2 ,都是异常的情况。然后 3. 可以 return 直接;
- // 2. 获取设备信息
- IotDeviceRespDTO device = deviceService.getDeviceFromCache(messageInfo.message.getDeviceId());
- if (device == null) {
- sendError(socket, messageInfo.message.getRequestId(), "设备不存在", messageInfo.codecType);
- return;
+ // 3. 根据消息类型路由处理
+ if (isAuthRequest(messageInfo.message)) {
+ // 认证请求:无需检查认证状态
+ handleAuthenticationRequest(clientId, messageInfo, socket);
+ } else {
+ // 业务消息:需要检查认证状态
+ handleBusinessRequest(clientId, messageInfo, socket);
}
- // 3. 处理消息
- if (!authManager.isAuthenticated(socket)) {
- handleAuthRequest(clientId, messageInfo.message, socket, messageInfo.codecType);
- } else {
- IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
- handleBusinessMessage(clientId, messageInfo.message, authInfo);
- }
} catch (Exception e) {
- log.error("[handleDataPackage][处理数据包失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
+ log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
}
}
/**
* 处理认证请求
*/
- private void handleAuthRequest(String clientId, IotDeviceMessage message, NetSocket socket, String codecType) {
+ private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
try {
- // 1. 验证认证请求
- // TODO @haohao:ObjUtil.notEquals。减少取反
- if (!AUTH_METHOD.equals(message.getMethod())) {
- sendError(socket, message.getRequestId(), "请先进行认证", codecType);
- return;
- }
+ IotDeviceMessage message = messageInfo.message;
- // 2. 解析认证参数 // TODO @haohao:1. 和 2. 可以合并成 1.1 1.2 都是参数校验
+ // 1. 解析认证参数
AuthParams authParams = parseAuthParams(message.getParams());
if (authParams == null) {
- sendError(socket, message.getRequestId(), "认证参数不完整", codecType);
+ sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
return;
}
- // 3. 执行认证流程
- // TODO @haohao:成功失败、都大哥日志,会不会更好哈?
- if (performAuthentication(authParams, socket, message.getRequestId(), codecType)) {
- log.info("[handleAuthRequest][认证成功] 客户端 ID: {}, username: {}", clientId, authParams.username);
+ // 2. 执行认证
+ if (!authenticateDevice(authParams)) {
+ log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
+ clientId, authParams.username);
+ sendError(socket, message.getRequestId(), "认证失败", messageInfo.codecType);
+ return;
}
+
+ // 3. 解析设备信息
+ IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
+ if (deviceInfo == null) {
+ sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
+ return;
+ }
+
+ // 4. 获取设备信息
+ IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
+ deviceInfo.getDeviceName());
+ if (device == null) {
+ sendError(socket, message.getRequestId(), "设备不存在", messageInfo.codecType);
+ return;
+ }
+
+ // 5. 注册连接并发送成功响应
+ registerConnection(socket, device, deviceInfo, authParams.clientId);
+ sendOnlineMessage(deviceInfo);
+ sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
+
+ log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
+ device.getId(), deviceInfo.getDeviceName());
+
} catch (Exception e) {
- log.error("[handleAuthRequest][认证处理异常] 客户端 ID: {}", clientId, e);
- sendError(socket, message.getRequestId(), "认证处理异常: " + e.getMessage(), codecType);
+ log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
+ sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
+ }
+ }
+
+ /**
+ * 处理业务请求
+ */
+ private void handleBusinessRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
+ try {
+ // 1. 检查认证状态
+ if (connectionManager.isNotAuthenticated(socket)) {
+ log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
+ sendError(socket, messageInfo.message.getRequestId(), "请先进行认证", messageInfo.codecType);
+ return;
+ }
+
+ // 2. 获取认证信息并处理业务消息
+ IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
+ processBusinessMessage(clientId, messageInfo.message, authInfo);
+
+ } catch (Exception e) {
+ log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
}
}
/**
* 处理业务消息
*/
- private void handleBusinessMessage(String clientId, IotDeviceMessage message,
- IotTcpAuthManager.AuthInfo authInfo) {
+ private void processBusinessMessage(String clientId, IotDeviceMessage message,
+ IotTcpConnectionManager.AuthInfo authInfo) {
try {
message.setDeviceId(authInfo.getDeviceId());
message.setServerId(serverId);
- deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(), authInfo.getDeviceName(),
- serverId);
- log.info("[handleBusinessMessage][业务消息处理完成] 客户端 ID: {}, 消息 ID: {}, 设备 ID: {}, 方法: {}",
- clientId, message.getId(), message.getDeviceId(), message.getMethod());
+ // 发送到消息总线
+ deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
+ authInfo.getDeviceName(), serverId);
+
} catch (Exception e) {
- log.error("[handleBusinessMessage][处理业务消息失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
+ log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
+ clientId, message.getId(), e);
}
}
/**
* 解码消息
*/
- // TODO @haohao:是不是还是直接管理后台配置协议,然后直接使用就好啦。暂时不考虑动态解析哈。保持一致,降低理解成本哈。
private MessageInfo decodeMessage(Buffer buffer) {
+ if (buffer == null || buffer.length() == 0) {
+ return null;
+ }
+
+ // 1. 快速检测消息格式类型
+ String codecType = detectMessageFormat(buffer);
+
try {
- String rawData = buffer.toString();
- String codecType = isJsonFormat(rawData) ? CODEC_TYPE_JSON : CODEC_TYPE_BINARY;
+ // 2. 使用检测到的格式进行解码
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
- return message != null ? new MessageInfo(message, codecType) : null;
+
+ if (message == null) {
+ return null;
+ }
+
+ return new MessageInfo(message, codecType);
+
} catch (Exception e) {
- log.debug("[decodeMessage][消息解码失败] 错误: {}", e.getMessage());
+ log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
+ codecType, buffer.length(), e.getMessage());
return null;
}
}
/**
- * 执行认证
+ * 检测消息格式类型
+ * 优化性能:避免不必要的字符串转换
*/
- // TODO @haohao:下面的 1. 2. 可以合并下,本质也是校验哈。
- private boolean performAuthentication(AuthParams authParams, NetSocket socket, String requestId, String codecType) {
- // 1. 执行认证
- if (!authenticateDevice(authParams)) {
- sendError(socket, requestId, "认证失败", codecType);
- return false;
+ private String detectMessageFormat(Buffer buffer) {
+ if (buffer.length() == 0) {
+ return CODEC_TYPE_JSON; // 默认使用 JSON
}
- // 2. 获取设备信息
- IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(authParams.username);
- if (deviceInfo == null) {
- sendError(socket, requestId, "解析设备信息失败", codecType);
- return false;
+ // 1. 优先检测二进制格式(检查魔术字节 0x7E)
+ if (isBinaryFormat(buffer)) {
+ return CODEC_TYPE_BINARY;
}
- IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
- deviceInfo.getDeviceName());
- if (device == null) {
- sendError(socket, requestId, "设备不存在", codecType);
- return false;
+ // 2. 检测 JSON 格式(检查前几个有效字符)
+ if (isJsonFormat(buffer)) {
+ return CODEC_TYPE_JSON;
}
- // 3. 注册认证信息
- String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
- registerAuthInfo(socket, device, deviceInfo, token, authParams.clientId);
-
- // 4. 发送上线消息和成功响应
- IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
- deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
- serverId);
- sendSuccess(socket, requestId, "认证成功", codecType);
- return true;
+ // 3. 默认尝试 JSON 格式
+ return CODEC_TYPE_JSON;
}
/**
- * 发送响应
+ * 检测二进制格式
+ * 通过检查魔术字节快速识别,避免完整字符串转换
+ */
+ private boolean isBinaryFormat(Buffer buffer) {
+ // 二进制协议最小长度检查
+ if (buffer.length() < 8) {
+ return false;
+ }
+
+ try {
+ // 检查魔术字节 0x7E(二进制协议的第一个字节)
+ byte firstByte = buffer.getByte(0);
+ return firstByte == (byte) 0x7E;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * 检测 JSON 格式
+ * 只检查前几个有效字符,避免完整字符串转换
+ */
+ private boolean isJsonFormat(Buffer buffer) {
+ try {
+ // 检查前 64 个字节或整个缓冲区(取较小值)
+ int checkLength = Math.min(buffer.length(), 64);
+ String prefix = buffer.getString(0, checkLength, StandardCharsets.UTF_8.name());
+
+ if (StrUtil.isBlank(prefix)) {
+ return false;
+ }
+
+ String trimmed = prefix.trim();
+ // JSON 格式必须以 { 或 [ 开头
+ return trimmed.startsWith("{") || trimmed.startsWith("[");
+
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * 注册连接信息
+ */
+ private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
+ IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
+ // 创建认证信息
+ IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
+ .setDeviceId(device.getId())
+ .setProductKey(deviceInfo.getProductKey())
+ .setDeviceName(deviceInfo.getDeviceName())
+ .setClientId(clientId);
+
+ // 注册连接
+ connectionManager.registerConnection(socket, device.getId(), authInfo);
+ }
+
+ /**
+ * 发送设备上线消息
+ */
+ private void sendOnlineMessage(IotDeviceAuthUtils.DeviceInfo deviceInfo) {
+ try {
+ IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
+ deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(),
+ deviceInfo.getDeviceName(), serverId);
+ } catch (Exception e) {
+ log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", deviceInfo.getDeviceName(), e);
+ }
+ }
+
+ /**
+ * 清理连接
+ */
+ private void cleanupConnection(NetSocket socket) {
+ try {
+ // 发送离线消息(如果已认证)
+ IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
+ if (authInfo != null) {
+ IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
+ deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(),
+ authInfo.getDeviceName(), serverId);
+ }
+
+ // 注销连接
+ connectionManager.unregisterConnection(socket);
+ } catch (Exception e) {
+ log.error("[cleanupConnection][清理连接失败]", e);
+ }
+ }
+
+ /**
+ * 发送响应消息
*/
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
try {
- Object responseData = buildResponseData(success, message);
+ Object responseData = MapUtil.builder()
+ .put("success", success)
+ .put("message", message)
+ .build();
+
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
success ? 0 : 401, message);
+
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
- log.debug("[sendResponse][发送响应] success: {}, message: {}, requestId: {}", success, message, requestId);
+
} catch (Exception e) {
- log.error("[sendResponse][发送响应失败] requestId: {}", requestId, e);
+ log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e);
}
}
- /**
- * 构建响应数据(不返回 token)
- */
- private Object buildResponseData(boolean success, String message) {
- return MapUtil.builder()
- .put("success", success)
- .put("message", message)
- .build();
- }
+ // ==================== 辅助方法 ====================
/**
- * 清理会话
+ * 判断是否为认证请求
*/
- private void cleanupSession(NetSocket socket) {
- // 如果已认证,发送离线消息
- IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
- if (authInfo != null) {
- // 发送离线消息
- IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
- deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(), authInfo.getDeviceName(),
- serverId);
- }
- sessionManager.unregisterSession(socket);
- authManager.unregisterAuth(socket);
- }
-
- /**
- * 判断是否为 JSON 格式
- */
- private boolean isJsonFormat(String data) {
- if (StrUtil.isBlank(data)) {
- return false;
- }
- String trimmed = data.trim();
- return (trimmed.startsWith("{") && trimmed.endsWith("}")) || (trimmed.startsWith("[") && trimmed.endsWith("]"));
+ private boolean isAuthRequest(IotDeviceMessage message) {
+ return AUTH_METHOD.equals(message.getMethod());
}
/**
@@ -273,38 +377,37 @@ public class IotTcpUpstreamHandler implements Handler {
if (params == null) {
return null;
}
- JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
- : JSONUtil.parseObj(params.toString());
- String clientId = paramsJson.getStr("clientId");
- String username = paramsJson.getStr("username");
- String password = paramsJson.getStr("password");
- return StrUtil.hasBlank(clientId, username, password) ? null : new AuthParams(clientId, username, password);
+
+ try {
+ JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
+ : JSONUtil.parseObj(params.toString());
+
+ String clientId = paramsJson.getStr("clientId");
+ String username = paramsJson.getStr("username");
+ String password = paramsJson.getStr("password");
+
+ return StrUtil.hasBlank(clientId, username, password) ? null
+ : new AuthParams(clientId, username, password);
+ } catch (Exception e) {
+ log.warn("[parseAuthParams][解析认证参数失败]", e);
+ return null;
+ }
}
/**
* 认证设备
*/
private boolean authenticateDevice(AuthParams authParams) {
- CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
- .setClientId(authParams.clientId).setUsername(authParams.username).setPassword(authParams.password));
- return result.isSuccess() && result.getData();
- }
-
- /**
- * 注册认证信息
- */
- private void registerAuthInfo(NetSocket socket, IotDeviceRespDTO device, IotDeviceAuthUtils.DeviceInfo deviceInfo,
- String token, String clientId) {
- // TODO @haohao:可以链式调用;
- IotTcpAuthManager.AuthInfo auth = new IotTcpAuthManager.AuthInfo();
- auth.setDeviceId(device.getId());
- auth.setProductKey(deviceInfo.getProductKey());
- auth.setDeviceName(deviceInfo.getDeviceName());
- auth.setToken(token);
- auth.setClientId(clientId);
-
- authManager.registerAuth(socket, auth);
- sessionManager.registerSession(device.getId(), socket);
+ try {
+ CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
+ .setClientId(authParams.clientId)
+ .setUsername(authParams.username)
+ .setPassword(authParams.password));
+ return result.isSuccess() && Boolean.TRUE.equals(result.getData());
+ } catch (Exception e) {
+ log.error("[authenticateDevice][设备认证异常,username: {}]", authParams.username, e);
+ return false;
+ }
}
/**
@@ -315,24 +418,32 @@ public class IotTcpUpstreamHandler implements Handler {
}
/**
- * 发送成功响应(不返回 token)
+ * 发送成功响应
*/
private void sendSuccess(NetSocket socket, String requestId, String message, String codecType) {
sendResponse(socket, true, message, requestId, codecType);
}
- // TODO @haohao:使用 lombok,方便 jdk8 兼容
+ // ==================== 内部类 ====================
/**
* 认证参数
*/
- private record AuthParams(String clientId, String username, String password) {
+ @Data
+ @AllArgsConstructor
+ private static class AuthParams {
+ private final String clientId;
+ private final String username;
+ private final String password;
}
/**
* 消息信息
*/
- private record MessageInfo(IotDeviceMessage message, String codecType) {
+ @Data
+ @AllArgsConstructor
+ private static class MessageInfo {
+ private final IotDeviceMessage message;
+ private final String codecType;
}
-
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml
index 26376b6669..b306f0588c 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml
@@ -4,6 +4,15 @@ spring:
profiles:
active: local # 默认激活本地开发环境
+ # Redis 配置
+ data:
+ redis:
+ host: 127.0.0.1 # Redis 服务器地址
+ port: 6379 # Redis 服务器端口
+ database: 0 # Redis 数据库索引
+ # password: # Redis 密码,如果有的话
+ timeout: 30000ms # 连接超时时间
+
--- #################### 消息队列相关 ####################
# rocketmq 配置项,对应 RocketMQProperties 配置类
@@ -45,7 +54,7 @@ yudao:
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
- enabled: false
+ enabled: true
http-port: 8090 # MQTT HTTP 服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpBinaryDataPacketExamplesTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpBinaryDataPacketExamplesTest.java
deleted file mode 100644
index 2e6fb41acc..0000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpBinaryDataPacketExamplesTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
-
-import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-/**
- * TCP 二进制格式数据包单元测试
- *
- * 测试二进制协议创建和解析 TCP 上报数据包和心跳包
- *
- * 二进制协议格式:
- * 包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
- *
- * @author 芋道源码
- */
-@Slf4j
-class TcpBinaryDataPacketExamplesTest {
-
- private IotTcpBinaryDeviceMessageCodec codec;
-
- @BeforeEach
- void setUp() {
- codec = new IotTcpBinaryDeviceMessageCodec();
- }
-
- @Test
- void testDataReport() {
- log.info("=== 二进制格式数据上报包测试 ===");
-
- // 创建传感器数据
- Map sensorData = new HashMap<>();
- sensorData.put("temperature", 25.5);
- sensorData.put("humidity", 60.2);
- sensorData.put("pressure", 1013.25);
- sensorData.put("battery", 85);
-
- // 创建设备消息
- IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", sensorData);
- message.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(message);
- log.info("编码后数据包长度: {} 字节", packet.length);
- log.info("编码后数据包(HEX): {}", bytesToHex(packet));
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后请求 ID: {}", decoded.getRequestId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后服务 ID: {}", decoded.getServerId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.property.post", decoded.getMethod());
- assertNotNull(decoded.getParams());
- assertTrue(decoded.getParams() instanceof Map);
- }
-
- @Test
- void testHeartbeat() {
- log.info("=== 二进制格式心跳包测试 ===");
-
- // 创建心跳消息
- IotDeviceMessage heartbeat = IotDeviceMessage.requestOf("thing.state.online", null);
- heartbeat.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(heartbeat);
- log.info("心跳包长度: {} 字节", packet.length);
- log.info("心跳包(HEX): {}", bytesToHex(packet));
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后请求 ID: {}", decoded.getRequestId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后服务 ID: {}", decoded.getServerId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.state.online", decoded.getMethod());
- }
-
- @Test
- void testComplexDataReport() {
- log.info("=== 二进制格式复杂数据上报测试 ===");
-
- // 创建复杂设备数据
- Map deviceData = new HashMap<>();
-
- // 环境数据
- Map environment = new HashMap<>();
- environment.put("temperature", 23.8);
- environment.put("humidity", 55.0);
- environment.put("co2", 420);
- deviceData.put("environment", environment);
-
- // GPS 数据
- Map location = new HashMap<>();
- location.put("latitude", 39.9042);
- location.put("longitude", 116.4074);
- location.put("altitude", 43.5);
- deviceData.put("location", location);
-
- // 设备状态
- Map status = new HashMap<>();
- status.put("battery", 78);
- status.put("signal", -65);
- status.put("online", true);
- deviceData.put("status", status);
-
- // 创建设备消息
- IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", deviceData);
- message.setDeviceId(789012L);
-
- // 编码
- byte[] packet = codec.encode(message);
- log.info("复杂数据包长度: {} 字节", packet.length);
- log.info("复杂数据包(HEX): {}", bytesToHex(packet));
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后请求 ID: {}", decoded.getRequestId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后服务 ID: {}", decoded.getServerId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.property.post", decoded.getMethod());
- assertNotNull(decoded.getParams());
- }
-
- @Test
- void testPacketStructureAnalysis() {
- log.info("=== 数据包结构分析测试 ===");
-
- // 创建测试数据
- Map sensorData = new HashMap<>();
- sensorData.put("temperature", 25.5);
- sensorData.put("humidity", 60.2);
-
- IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", sensorData);
- message.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(message);
-
- // 分析数据包结构
- analyzePacketStructure(packet);
-
- // 断言验证
- assertTrue(packet.length >= 8, "数据包长度应该至少为 8 字节");
- }
-
- // ==================== 内部辅助方法 ====================
-
- /**
- * 字节数组转十六进制字符串
- *
- * @param bytes 字节数组
- * @return 十六进制字符串
- */
- private static String bytesToHex(byte[] bytes) {
- StringBuilder result = new StringBuilder();
- for (byte b : bytes) {
- result.append(String.format("%02X ", b));
- }
- return result.toString().trim();
- }
-
- /**
- * 演示数据包结构分析
- *
- * @param packet 数据包
- */
- private static void analyzePacketStructure(byte[] packet) {
- if (packet.length < 8) {
- log.error("数据包长度不足");
- return;
- }
-
- int index = 0;
-
- // 解析包头(4 字节) - 后续数据长度
- int totalLength = ((packet[index] & 0xFF) << 24) |
- ((packet[index + 1] & 0xFF) << 16) |
- ((packet[index + 2] & 0xFF) << 8) |
- (packet[index + 3] & 0xFF);
- index += 4;
- log.info("包头 - 后续数据长度: {} 字节", totalLength);
-
- // 解析功能码(2 字节)
- int functionCode = ((packet[index] & 0xFF) << 8) | (packet[index + 1] & 0xFF);
- index += 2;
- log.info("功能码: {} ({})", functionCode, getFunctionCodeName(functionCode));
-
- // 解析消息序号(2 字节)
- int messageId = ((packet[index] & 0xFF) << 8) | (packet[index + 1] & 0xFF);
- index += 2;
- log.info("消息序号: {}", messageId);
-
- // 解析包体数据
- if (index < packet.length) {
- String payload = new String(packet, index, packet.length - index);
- log.info("包体数据: {}", payload);
- }
- }
-
- /**
- * 获取功能码名称
- *
- * @param code 功能码
- * @return 功能码名称
- */
- private static String getFunctionCodeName(int code) {
- return switch (code) {
- case 10 -> "设备注册";
- case 11 -> "注册回复";
- case 20 -> "心跳请求";
- case 21 -> "心跳回复";
- case 30 -> "消息上行";
- case 40 -> "消息下行";
- default -> "未知功能码";
- };
- }
-}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpJsonDataPacketExamplesTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpJsonDataPacketExamplesTest.java
deleted file mode 100644
index 24258e0de2..0000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/TcpJsonDataPacketExamplesTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
-
-import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-/**
- * TCP JSON 格式数据包单元测试
- *
- * 测试 JSON 格式的 TCP 消息编解码功能
- *
- * @author 芋道源码
- */
-@Slf4j
-class TcpJsonDataPacketExamplesTest {
-
- private IotTcpJsonDeviceMessageCodec codec;
-
- @BeforeEach
- void setUp() {
- codec = new IotTcpJsonDeviceMessageCodec();
- }
-
- @Test
- void testDataReport() {
- log.info("=== JSON 格式数据上报测试 ===");
-
- // 创建传感器数据
- Map sensorData = new HashMap<>();
- sensorData.put("temperature", 25.5);
- sensorData.put("humidity", 60.2);
- sensorData.put("pressure", 1013.25);
- sensorData.put("battery", 85);
-
- // 创建设备消息
- IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", sensorData);
- message.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(message);
- String jsonString = new String(packet, StandardCharsets.UTF_8);
- log.info("编码后 JSON: {}", jsonString);
- log.info("数据包长度: {} 字节", packet.length);
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后服务 ID: {}", decoded.getServerId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.property.post", decoded.getMethod());
- assertEquals(123456L, decoded.getDeviceId());
- assertNotNull(decoded.getParams());
- assertTrue(decoded.getParams() instanceof Map);
- }
-
- @Test
- void testHeartbeat() {
- log.info("=== JSON 格式心跳测试 ===");
-
- // 创建心跳消息
- IotDeviceMessage heartbeat = IotDeviceMessage.requestOf("thing.state.online", null);
- heartbeat.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(heartbeat);
- String jsonString = new String(packet, StandardCharsets.UTF_8);
- log.info("编码后 JSON: {}", jsonString);
- log.info("心跳包长度: {} 字节", packet.length);
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后服务 ID: {}", decoded.getServerId());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.state.online", decoded.getMethod());
- assertEquals(123456L, decoded.getDeviceId());
- }
-
- @Test
- void testEventReport() {
- log.info("=== JSON 格式事件上报测试 ===");
-
- // 创建事件数据
- Map eventData = new HashMap<>();
- eventData.put("eventType", "alarm");
- eventData.put("level", "warning");
- eventData.put("description", "温度过高");
- eventData.put("value", 45.8);
-
- // 创建事件消息
- IotDeviceMessage event = IotDeviceMessage.requestOf("thing.event.post", eventData);
- event.setDeviceId(123456L);
-
- // 编码
- byte[] packet = codec.encode(event);
- String jsonString = new String(packet, StandardCharsets.UTF_8);
- log.info("编码后 JSON: {}", jsonString);
- log.info("事件包长度: {} 字节", packet.length);
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.event.post", decoded.getMethod());
- assertEquals(123456L, decoded.getDeviceId());
- assertNotNull(decoded.getParams());
- }
-
- @Test
- void testComplexDataReport() {
- log.info("=== JSON 格式复杂数据上报测试 ===");
-
- // 创建复杂设备数据(类似 EMQX 格式)
- Map deviceData = new HashMap<>();
-
- // 环境数据
- Map environment = new HashMap<>();
- environment.put("temperature", 23.8);
- environment.put("humidity", 55.0);
- environment.put("co2", 420);
- environment.put("pm25", 35);
- deviceData.put("environment", environment);
-
- // GPS 数据
- Map location = new HashMap<>();
- location.put("latitude", 39.9042);
- location.put("longitude", 116.4074);
- location.put("altitude", 43.5);
- location.put("speed", 0.0);
- deviceData.put("location", location);
-
- // 设备状态
- Map status = new HashMap<>();
- status.put("battery", 78);
- status.put("signal", -65);
- status.put("online", true);
- status.put("version", "1.2.3");
- deviceData.put("status", status);
-
- // 创建设备消息
- IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", deviceData);
- message.setDeviceId(789012L);
-
- // 编码
- byte[] packet = codec.encode(message);
- String jsonString = new String(packet, StandardCharsets.UTF_8);
- log.info("编码后 JSON: {}", jsonString);
- log.info("复杂数据包长度: {} 字节", packet.length);
-
- // 解码验证
- IotDeviceMessage decoded = codec.decode(packet);
- log.info("解码后消息 ID: {}", decoded.getId());
- log.info("解码后方法: {}", decoded.getMethod());
- log.info("解码后设备 ID: {}", decoded.getDeviceId());
- log.info("解码后参数: {}", decoded.getParams());
-
- // 断言验证
- assertNotNull(decoded.getId());
- assertEquals("thing.property.post", decoded.getMethod());
- assertEquals(789012L, decoded.getDeviceId());
- assertNotNull(decoded.getParams());
- }
-
-}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
index 4c2807276e..d85d347f70 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
@@ -1,370 +1,198 @@
-# TCP 二进制协议数据包格式说明和示例
+# TCP 二进制协议数据包格式说明
-## 1. 二进制协议概述
+## 1. 协议概述
-TCP 二进制协议是一种高效的自定义协议格式,适用于对带宽和性能要求较高的场景。该协议采用紧凑的二进制格式,减少数据传输量,提高传输效率。
+TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二进制格式传输数据,适用于对带宽和性能要求较高的 IoT 场景。
-## 2. 数据包格式
+### 1.1 协议特点
+
+- **高效传输**:完全二进制格式,减少数据传输量
+- **版本控制**:内置协议版本号,支持协议升级
+- **类型安全**:明确的消息类型标识
+- **扩展性**:预留标志位,支持未来功能扩展
+- **兼容性**:与现有 `IotDeviceMessage` 接口完全兼容
+
+## 2. 协议格式
### 2.1 整体结构
-根据代码实现,TCP 二进制协议的数据包格式为:
-
```
-+----------+----------+----------+----------+
-| 包头 | 功能码 | 消息序号 | 包体数据 |
-| 4字节 | 2字节 | 2字节 | 变长 |
-+----------+----------+----------+----------+
++--------+--------+--------+--------+--------+--------+--------+--------+
+| 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
++--------+--------+--------+--------+--------+--------+--------+--------+
+| 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
++--------+--------+--------+--------+--------+--------+--------+--------+
+| 方法名长度(2字节) | 方法名(变长字符串) |
++--------+--------+--------+--------+--------+--------+--------+--------+
+| 消息体数据(变长) |
++--------+--------+--------+--------+--------+--------+--------+--------+
```
-**注意**:与原始设计相比,实际实现中移除了设备地址字段,简化了协议结构。
+### 2.2 字段详细说明
-### 2.2 字段说明
+| 字段 | 长度 | 类型 | 说明 |
+|------|------|------|------|
+| 魔术字 | 1字节 | byte | `0x7E` - 协议识别标识,用于数据同步 |
+| 版本号 | 1字节 | byte | `0x01` - 协议版本号,支持版本控制 |
+| 消息类型 | 1字节 | byte | `0x01`=请求, `0x02`=响应 |
+| 消息标志 | 1字节 | byte | 预留字段,用于未来扩展 |
+| 消息长度 | 4字节 | int | 整个消息的总长度(包含头部) |
+| 消息 ID 长度 | 2字节 | short | 消息 ID 字符串的字节长度 |
+| 消息 ID | 变长 | string | 消息唯一标识符(UTF-8编码) |
+| 方法名长度 | 2字节 | short | 方法名字符串的字节长度 |
+| 方法名 | 变长 | string | 消息方法名(UTF-8编码) |
+| 消息体 | 变长 | binary | 根据消息类型的不同数据格式 |
-| 字段 | 长度 | 类型 | 说明 |
-|------|-----|--------|-----------------|
-| 包头 | 4字节 | int | 后续数据的总长度(不包含包头) |
-| 功能码 | 2字节 | short | 消息类型标识 |
-| 消息序号 | 2字节 | short | 消息唯一标识 |
-| 包体数据 | 变长 | string | JSON 格式的消息内容 |
+**⚠️ 重要说明**:deviceId 不包含在协议中,由服务器根据连接上下文自动设置
-### 2.3 功能码定义
-
-根据代码实现,支持的功能码:
-
-| 功能码 | 名称 | 说明 |
-|-----|------|--------------|
-| 10 | 设备注册 | 设备首次连接时的注册请求 |
-| 11 | 注册回复 | 服务器对注册请求的回复 |
-| 20 | 心跳请求 | 设备发送的心跳包 |
-| 21 | 心跳回复 | 服务器对心跳的回复 |
-| 30 | 消息上行 | 设备向服务器发送的数据 |
-| 40 | 消息下行 | 服务器向设备发送的指令 |
-
-**常量定义:**
+### 2.3 协议常量定义
```java
-public static final short CODE_REGISTER = 10;
-public static final short CODE_REGISTER_REPLY = 11;
-public static final short CODE_HEARTBEAT = 20;
-public static final short CODE_HEARTBEAT_REPLY = 21;
-public static final short CODE_MESSAGE_UP = 30;
-public static final short CODE_MESSAGE_DOWN = 40;
-```
+// 协议标识
+private static final byte MAGIC_NUMBER = (byte) 0x7E;
+private static final byte PROTOCOL_VERSION = (byte) 0x01;
-## 3. 包体数据格式
-
-### 3.1 JSON 负载结构
-
-包体数据采用 JSON 格式,包含以下字段:
-
-```json
-{
- "method": "消息方法",
- "params": {
- // 消息参数
- },
- "timestamp": 时间戳,
- "requestId": "请求ID",
- "msgId": "消息ID"
+// 消息类型
+public static class MessageType {
+ public static final byte REQUEST = 0x01; // 请求消息
+ public static final byte RESPONSE = 0x02; // 响应消息
}
+
+// 协议长度
+private static final int HEADER_FIXED_LENGTH = 8; // 固定头部长度
+private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
```
-### 3.2 字段说明
+## 3. 消息类型和格式
-| 字段名 | 类型 | 必填 | 说明 |
-|-----------|--------|----|------------------------------|
-| method | String | 是 | 消息方法,如 `thing.property.post` |
-| params | Object | 否 | 消息参数 |
-| timestamp | Long | 是 | 时间戳(毫秒) |
-| requestId | String | 否 | 请求唯一标识 |
-| msgId | String | 否 | 消息唯一标识 |
+### 3.1 请求消息 (REQUEST - 0x01)
-**常量定义:**
+请求消息用于设备向服务器发送数据或请求。
-```java
-public static final String METHOD = "method";
-public static final String PARAMS = "params";
-public static final String TIMESTAMP = "timestamp";
-public static final String REQUEST_ID = "requestId";
-public static final String MESSAGE_ID = "msgId";
+#### 3.1.1 消息体格式
+```
+消息体 = params 数据(JSON格式)
```
-## 4. 消息类型
+#### 3.1.2 示例:设备认证请求
-### 4.1 数据上报 (thing.property.post)
+**消息内容:**
+- 消息 ID: `auth_1704067200000_123`
+- 方法名: `auth`
+- 参数: `{"clientId":"device_001","username":"productKey_deviceName","password":"device_password"}`
-设备向服务器上报属性数据。
-
-**功能码:** 30 (CODE_MESSAGE_UP)
-
-**包体数据示例:**
-
-```json
-{
- "method": "thing.property.post",
- "params": {
- "temperature": 25.5,
- "humidity": 60.2,
- "pressure": 1013.25
- },
- "timestamp": 1642781234567,
- "requestId": "req_001"
-}
+**二进制数据包结构:**
+```
+7E // 魔术字 (0x7E)
+01 // 版本号 (0x01)
+01 // 消息类型 (REQUEST)
+00 // 消息标志 (预留)
+00 00 00 8A // 消息长度 (138字节)
+00 19 // 消息 ID 长度 (25字节)
+61 75 74 68 5F 31 37 30 34 30 // 消息 ID: "auth_1704067200000_123"
+36 37 32 30 30 30 30 30 5F 31
+32 33
+00 04 // 方法名长度 (4字节)
+61 75 74 68 // 方法名: "auth"
+7B 22 63 6C 69 65 6E 74 49 64 // JSON参数数据
+22 3A 22 64 65 76 69 63 65 5F // {"clientId":"device_001",
+30 30 31 22 2C 22 75 73 65 72 // "username":"productKey_deviceName",
+6E 61 6D 65 22 3A 22 70 72 6F // "password":"device_password"}
+64 75 63 74 4B 65 79 5F 64 65
+76 69 63 65 4E 61 6D 65 22 2C
+22 70 61 73 73 77 6F 72 64 22
+3A 22 64 65 76 69 63 65 5F 70
+61 73 73 77 6F 72 64 22 7D
```
-### 4.2 心跳 (thing.state.online)
+#### 3.1.3 示例:属性数据上报
-设备向服务器发送心跳保活。
+**消息内容:**
+- 消息 ID: `property_1704067200000_456`
+- 方法名: `thing.property.post`
+- 参数: `{"temperature":25.5,"humidity":60.2,"pressure":1013.25}`
-**功能码:** 20 (CODE_HEARTBEAT)
+### 3.2 响应消息 (RESPONSE - 0x02)
-**包体数据示例:**
+响应消息用于服务器向设备回复请求结果。
-```json
-{
- "method": "thing.state.online",
- "params": {},
- "timestamp": 1642781234567,
- "requestId": "req_002"
-}
+#### 3.2.1 消息体格式
+```
+消息体 = 响应码(4字节) + 响应消息长度(2字节) + 响应消息(UTF-8) + 响应数据(JSON)
```
-### 4.3 消息方法常量
+#### 3.2.2 字段说明
-```java
-public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
-public static final String STATE_ONLINE = "thing.state.online"; // 心跳
+| 字段 | 长度 | 类型 | 说明 |
+|------|------|------|------|
+| 响应码 | 4字节 | int | HTTP状态码风格,0=成功,其他=错误 |
+| 响应消息长度 | 2字节 | short | 响应消息字符串的字节长度 |
+| 响应消息 | 变长 | string | 响应提示信息(UTF-8编码) |
+| 响应数据 | 变长 | binary | JSON格式的响应数据(可选) |
+
+#### 3.2.3 示例:认证成功响应
+
+**消息内容:**
+- 消息 ID: `auth_response_1704067200000_123`
+- 方法名: `auth`
+- 响应码: `0`
+- 响应消息: `认证成功`
+- 响应数据: `{"success":true,"message":"认证成功"}`
+
+**二进制数据包结构:**
+```
+7E // 魔术字 (0x7E)
+01 // 版本号 (0x01)
+02 // 消息类型 (RESPONSE)
+00 // 消息标志 (预留)
+00 00 00 A5 // 消息长度 (165字节)
+00 22 // 消息 ID 长度 (34字节)
+61 75 74 68 5F 72 65 73 70 6F // 消息 ID: "auth_response_1704067200000_123"
+6E 73 65 5F 31 37 30 34 30 36
+37 32 30 30 30 30 30 5F 31 32
+33
+00 04 // 方法名长度 (4字节)
+61 75 74 68 // 方法名: "auth"
+00 00 00 00 // 响应码 (0 = 成功)
+00 0C // 响应消息长度 (12字节)
+E8 AE A4 E8 AF 81 E6 88 90 E5 // 响应消息: "认证成功" (UTF-8)
+8A 9F
+7B 22 73 75 63 63 65 73 73 22 // JSON响应数据
+3A 74 72 75 65 2C 22 6D 65 73 // {"success":true,"message":"认证成功"}
+73 61 67 65 22 3A 22 E8 AE A4
+E8 AF 81 E6 88 90 E5 8A 9F 22
+7D
```
-## 5. 数据包示例
-
-### 5.1 温度传感器数据上报
-
-**包体数据:**
-```json
-{
- "method": "thing.property.post",
- "params": {
- "temperature": 25.5,
- "humidity": 60.2,
- "pressure": 1013.25
- },
- "timestamp": 1642781234567,
- "requestId": "req_001"
-}
-```
-
-**数据包结构:**
-```
-包头: 0x00000045 (69字节)
-功能码: 0x001E (30 - 消息上行)
-消息序号: 0x1234 (4660)
-包体: JSON字符串
-```
-
-**完整十六进制数据包:**
-```
-00 00 00 45 00 1E 12 34
-7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67
-2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C
-22 70 61 72 61 6D 73 22 3A 7B 22 74 65 6D 70 65
-72 61 74 75 72 65 22 3A 32 35 2E 35 2C 22 68 75
-6D 69 64 69 74 79 22 3A 36 30 2E 32 2C 22 70 72
-65 73 73 75 72 65 22 3A 31 30 31 33 2E 32 35 7D
-2C 22 74 69 6D 65 73 74 61 6D 70 22 3A 31 36 34
-32 37 38 31 32 33 34 35 36 37 2C 22 72 65 71 75
-65 73 74 49 64 22 3A 22 72 65 71 5F 30 30 31 22 7D
-```
-
-### 5.2 心跳包示例
-
-**包体数据:**
-```json
-{
- "method": "thing.state.online",
- "params": {},
- "timestamp": 1642781234567,
- "requestId": "req_002"
-}
-```
-
-**数据包结构:**
-```
-包头: 0x00000028 (40字节)
-功能码: 0x0014 (20 - 心跳请求)
-消息序号: 0x5678 (22136)
-包体: JSON字符串
-```
-
-**完整十六进制数据包:**
-```
-00 00 00 28 00 14 56 78
-7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67
-2E 73 74 61 74 65 2E 6F 6E 6C 69 6E 65 22 2C 22
-70 61 72 61 6D 73 22 3A 7B 7D 2C 22 74 69 6D 65
-73 74 61 6D 70 22 3A 31 36 34 32 37 38 31 32 33
-34 35 36 37 2C 22 72 65 71 75 65 73 74 49 64 22
-3A 22 72 65 71 5F 30 30 32 22 7D
-```
-
-## 6. 编解码器实现
-
-### 6.1 编码器类型
+## 4. 编解码器标识
```java
public static final String TYPE = "TCP_BINARY";
```
-### 6.2 编码过程
+## 5. 协议优势
-1. **参数验证**:检查消息和方法是否为空
-2. **确定功能码**:
- - 心跳消息:使用 `CODE_HEARTBEAT` (20)
- - 其他消息:使用 `CODE_MESSAGE_UP` (30)
-3. **构建负载**:使用 `buildSimplePayload()` 构建 JSON 负载
-4. **生成消息序号**:基于当前时间戳生成
-5. **构建数据包**:创建 `TcpDataPackage` 对象
-6. **编码为字节流**:使用 `encodeTcpDataPackage()` 编码
+- **数据紧凑**:二进制格式,相比 JSON 减少 30-50% 的数据量
+- **解析高效**:直接二进制操作,减少字符串转换开销
+- **类型安全**:明确的消息类型和字段定义
+- **扩展性强**:预留标志位支持未来功能扩展
+- **版本控制**:内置版本号支持协议升级
-### 6.3 解码过程
+## 6. 与 JSON 协议对比
-1. **参数验证**:检查字节数组是否为空
-2. **解码数据包**:使用 `decodeTcpDataPackage()` 解码
-3. **确定消息方法**:
- - 功能码 20:`thing.state.online` (心跳)
- - 功能码 30:`thing.property.post` (数据上报)
-4. **解析负载信息**:使用 `parsePayloadInfo()` 解析 JSON 负载
-5. **构建设备消息**:创建 `IotDeviceMessage` 对象
-6. **设置服务 ID**:使用 `generateServerId()` 生成
-
-### 6.4 服务 ID 生成
-
-```java
-private String generateServerId(TcpDataPackage dataPackage) {
- return String.format("tcp_binary_%d_%d", dataPackage.getCode(), dataPackage.getMid());
-}
-```
-
-## 7. 数据包解析步骤
-
-### 7.1 解析流程
-
-1. **读取包头(4字节)**
- - 获取后续数据的总长度
- - 验证数据包完整性
-
-2. **读取功能码(2字节)**
- - 确定消息类型
-
-3. **读取消息序号(2字节)**
- - 获取消息唯一标识
-
-4. **读取包体数据(变长)**
- - 解析 JSON 格式的消息内容
-
-### 7.2 Java 解析示例
-
-```java
-public TcpDataPackage parsePacket(byte[] packet) {
- int index = 0;
-
- // 1. 解析包头
- int totalLength = ByteBuffer.wrap(packet, index, 4).getInt();
- index += 4;
-
- // 2. 解析功能码
- short functionCode = ByteBuffer.wrap(packet, index, 2).getShort();
- index += 2;
-
- // 3. 解析消息序号
- short messageId = ByteBuffer.wrap(packet, index, 2).getShort();
- index += 2;
-
- // 4. 解析包体数据
- String payload = new String(packet, index, packet.length - index);
-
- return new TcpDataPackage(functionCode, messageId, payload);
-}
-```
-
-## 8. 使用示例
-
-### 8.1 基本使用
-
-```java
-// 创建编解码器
-IotTcpBinaryDeviceMessageCodec codec = new IotTcpBinaryDeviceMessageCodec();
-
-// 创建数据上报消息
-Map sensorData = Map.of(
- "temperature", 25.5,
- "humidity", 60.2
-);
-
-// 编码
-IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.post", sensorData);
-byte[] data = codec.encode(message);
-
-// 解码
-IotDeviceMessage decoded = codec.decode(data);
-```
-
-### 8.2 错误处理
-
-```java
-try{
-byte[] data = codec.encode(message);
-// 处理编码成功
-}catch(
-IllegalArgumentException e){
- // 处理参数错误
- log.
-
-error("编码参数错误: {}",e.getMessage());
- }catch(
-TcpCodecException e){
- // 处理编码失败
- log.
-
-error("编码失败: {}",e.getMessage());
- }
-```
-
-## 9. 注意事项
-
-1. **字节序**:所有多字节数据使用大端序(Big-Endian)
-2. **字符编码**:字符串数据使用 UTF-8 编码
-3. **JSON 格式**:包体数据必须是有效的 JSON 格式
-4. **长度限制**:单个数据包建议不超过 1MB
-5. **错误处理**:解析失败时会抛出 `TcpCodecException`
-6. **功能码映射**:目前只支持心跳和数据上报两种消息类型
-
-## 10. 协议特点
-
-### 10.1 优势
-
-- **高效传输**:二进制格式,数据量小
-- **性能优化**:减少解析开销
-- **带宽节省**:相比 JSON 格式节省带宽
-- **实时性好**:适合高频数据传输
-
-### 10.2 适用场景
+| 特性 | 二进制协议 | JSON协议 |
+|------|------------|----------|
+| 数据大小 | 小(节省30-50%) | 大 |
+| 解析性能 | 高 | 中等 |
+| 网络开销 | 低 | 高 |
+| 可读性 | 差 | 优秀 |
+| 调试难度 | 高 | 低 |
+| 扩展性 | 良好(有预留位) | 优秀 |
+**推荐场景**:
- ✅ **高频数据传输**:传感器数据实时上报
- ✅ **带宽受限环境**:移动网络、卫星通信
-- ✅ **性能要求高**:需要低延迟的场景
-- ✅ **设备资源有限**:嵌入式设备、IoT 设备
-
-### 10.3 与 JSON 协议对比
-
-| 特性 | 二进制协议 | JSON 协议 |
-|-------|-------|---------|
-| 数据大小 | 小 | 稍大 |
-| 解析性能 | 高 | 中等 |
-| 可读性 | 差 | 优秀 |
-| 调试难度 | 高 | 低 |
-| 扩展性 | 差 | 优秀 |
-| 实现复杂度 | 高 | 低 |
-
-这样就完成了 TCP 二进制协议的完整说明,与实际代码实现完全一致。
+- ✅ **性能要求高**:需要低延迟、高吞吐的场景
+- ✅ **设备资源有限**:嵌入式设备、低功耗设备
+- ❌ **开发调试阶段**:调试困难,建议使用 JSON 协议
+- ❌ **快速原型开发**:开发效率低
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-json-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-json-packet-examples.md
index 34251e7166..09ef50cfe5 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-json-packet-examples.md
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-json-packet-examples.md
@@ -2,13 +2,13 @@
## 1. 协议概述
-TCP JSON 格式协议采用纯 JSON 格式进行数据传输,参考了 EMQX 和 HTTP 模块的数据格式设计,具有以下优势:
+TCP JSON 格式协议采用纯 JSON 格式进行数据传输,具有以下特点:
- **标准化**:使用标准 JSON 格式,易于解析和处理
- **可读性**:人类可读,便于调试和维护
- **扩展性**:可以轻松添加新字段,向后兼容
-- **统一性**:与 HTTP 模块保持一致的数据格式
-- **简化性**:相比二进制协议,实现更简单,调试更容易
+- **跨平台**:JSON 格式支持所有主流编程语言
+- **安全优化**:移除冗余的 deviceId 字段,提高安全性
## 2. 消息格式
@@ -18,294 +18,174 @@ TCP JSON 格式协议采用纯 JSON 格式进行数据传输,参考了 EMQX
{
"id": "消息唯一标识",
"method": "消息方法",
- "deviceId": 设备ID,
"params": {
- // 消息参数
+ // 请求参数
+ },
+ "data": {
+ // 响应数据
},
- "timestamp": 时间戳,
"code": 响应码,
- "message": "响应消息"
+ "msg": "响应消息",
+ "timestamp": 时间戳
}
```
-### 2.2 字段说明
+**⚠️ 重要说明**:
+- **不包含 deviceId 字段**:由服务器通过 TCP 连接上下文自动确定设备 ID
+- **避免伪造攻击**:防止设备伪造其他设备的 ID 发送消息
-| 字段名 | 类型 | 必填 | 说明 |
-|-----------|---------|----|-------------------------------------|
-| id | String | 是 | 消息唯一标识,如果为空会自动生成 UUID |
-| method | String | 是 | 消息方法,如 `auth`、`thing.property.post` |
-| deviceId | Long | 否 | 设备 ID |
-| params | Object | 否 | 消息参数,具体内容根据 method 而定 |
-| timestamp | Long | 是 | 时间戳(毫秒),自动生成 |
-| code | Integer | 否 | 响应码(下行消息使用) |
-| message | String | 否 | 响应消息(下行消息使用) |
+### 2.2 字段详细说明
-## 3. 消息类型
+| 字段名 | 类型 | 必填 | 用途 | 说明 |
+|--------|------|------|------|------|
+| id | String | 是 | 所有消息 | 消息唯一标识 |
+| method | String | 是 | 所有消息 | 消息方法,如 `auth`、`thing.property.post` |
+| params | Object | 否 | 请求消息 | 请求参数,具体内容根据method而定 |
+| data | Object | 否 | 响应消息 | 响应数据,服务器返回的结果数据 |
+| code | Integer | 否 | 响应消息 | 响应码,0=成功,其他=错误 |
+| msg | String | 否 | 响应消息 | 响应提示信息 |
+| timestamp | Long | 是 | 所有消息 | 时间戳(毫秒),编码时自动生成 |
+
+### 2.3 消息分类
+
+#### 2.3.1 请求消息(上行)
+- **特征**:包含 `params` 字段,不包含 `code`、`msg` 字段
+- **方向**:设备 → 服务器
+- **用途**:设备认证、数据上报、状态更新等
+
+#### 2.3.2 响应消息(下行)
+- **特征**:包含 `code`、`msg` 字段,可能包含 `data` 字段
+- **方向**:服务器 → 设备
+- **用途**:认证结果、指令响应、错误提示等
+
+## 3. 消息示例
### 3.1 设备认证 (auth)
-设备连接后首先需要进行认证,认证成功后才能进行其他操作。
-
-#### 3.1.1 认证请求格式
-
-**示例:**
+#### 认证请求格式
+**消息方向**:设备 → 服务器
```json
{
- "id": "auth_8ac6a1db91e64aa9996143fdbac2cbfe",
+ "id": "auth_1704067200000_123",
"method": "auth",
"params": {
"clientId": "device_001",
"username": "productKey_deviceName",
"password": "设备密码"
},
- "timestamp": 1753111026437
+ "timestamp": 1704067200000
}
```
-**字段说明:**
+**认证参数说明:**
+
| 字段名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
-| clientId | String | 是 | 客户端唯一标识 |
+| clientId | String | 是 | 客户端唯一标识,用于连接管理 |
| username | String | 是 | 设备用户名,格式为 `productKey_deviceName` |
-| password | String | 是 | 设备密码 |
+| password | String | 是 | 设备密码,在设备管理平台配置 |
-#### 3.1.2 认证响应格式
+#### 认证响应格式
+**消息方向**:服务器 → 设备
**认证成功响应:**
-
```json
{
- "id": "auth_response_8ac6a1db91e64aa9996143fdbac2cbfe",
- "requestId": "auth_8ac6a1db91e64aa9996143fdbac2cbfe",
+ "id": "response_auth_1704067200000_123",
"method": "auth",
"data": {
"success": true,
"message": "认证成功"
},
"code": 0,
- "msg": "认证成功"
+ "msg": "认证成功",
+ "timestamp": 1704067200001
}
```
**认证失败响应:**
-
```json
{
- "id": "auth_response_8ac6a1db91e64aa9996143fdbac2cbfe",
- "requestId": "auth_8ac6a1db91e64aa9996143fdbac2cbfe",
+ "id": "response_auth_1704067200000_123",
"method": "auth",
"data": {
"success": false,
"message": "认证失败:用户名或密码错误"
},
"code": 401,
- "msg": "认证失败:用户名或密码错误"
+ "msg": "认证失败",
+ "timestamp": 1704067200001
}
```
-#### 3.1.3 认证流程
+### 3.2 属性数据上报 (thing.property.post)
-1. **设备连接** → 建立 TCP 连接
-2. **发送认证请求** → 发送包含认证信息的 JSON 消息
-3. **服务器验证** → 验证 clientId、username、password
-4. **生成 Token** → 认证成功后生成 JWT Token(内部使用)
-5. **设备上线** → 发送设备上线消息到消息总线
-6. **返回响应** → 返回认证结果
-7. **会话注册** → 注册设备会话,允许后续业务操作
+**消息方向**:设备 → 服务器
-#### 3.1.4 认证错误码
-
-| 错误码 | 说明 | 处理建议 |
-|-----|-------|--------------|
-| 401 | 认证失败 | 检查用户名、密码是否正确 |
-| 400 | 参数错误 | 检查认证参数是否完整 |
-| 404 | 设备不存在 | 检查设备是否已注册 |
-| 500 | 服务器错误 | 联系管理员 |
-
-### 3.2 数据上报 (thing.property.post)
-
-设备向服务器上报属性数据。
-
-**示例:**
+**示例:温度传感器数据上报**
```json
{
- "id": "8ac6a1db91e64aa9996143fdbac2cbfe",
+ "id": "property_1704067200000_456",
"method": "thing.property.post",
- "deviceId": 8,
"params": {
"temperature": 25.5,
"humidity": 60.2,
"pressure": 1013.25,
- "battery": 85
+ "battery": 85,
+ "signal_strength": -65
},
- "timestamp": 1753111026437
+ "timestamp": 1704067200000
}
```
-### 3.3 心跳 (thing.state.update)
+### 3.3 设备状态更新 (thing.state.update)
-设备向服务器发送心跳保活。
+**消息方向**:设备 → 服务器
-**示例:**
+**示例:心跳请求**
```json
{
- "id": "7db8c4e6408b40f8b2549ddd94f6bb02",
+ "id": "heartbeat_1704067200000_321",
"method": "thing.state.update",
- "deviceId": 8,
"params": {
- "state": "1"
+ "state": "online",
+ "uptime": 86400,
+ "memory_usage": 65.2,
+ "cpu_usage": 12.8
},
- "timestamp": 1753111026467
+ "timestamp": 1704067200000
}
```
-### 3.4 消息方法常量
+## 4. 编解码器标识
-支持的消息方法:
+```java
+public static final String TYPE = "TCP_JSON";
+```
-- `auth` - 设备认证
-- `thing.property.post` - 数据上报
-- `thing.state.update` - 心跳
+## 5. 协议优势
-## 4. 协议特点
-
-### 4.1 优势
-
-- **简单易用**:纯 JSON 格式,无需复杂的二进制解析
-- **调试友好**:可以直接查看消息内容
+- **开发效率高**:JSON 格式,开发和调试简单
+- **跨语言支持**:所有主流语言都支持 JSON
+- **可读性优秀**:可以直接查看消息内容
- **扩展性强**:可以轻松添加新字段
-- **标准化**:与 EMQX 等主流平台格式兼容
-- **错误处理**:提供详细的错误信息和异常处理
-- **安全性**:支持设备认证机制
+- **安全性高**:移除 deviceId 字段,防止伪造攻击
-### 4.2 与二进制协议对比
+## 6. 与二进制协议对比
-| 特性 | 二进制协议 | JSON 协议 |
-|-------|-------|----------|
-| 可读性 | 差 | 优秀 |
-| 调试难度 | 高 | 低 |
-| 扩展性 | 差 | 优秀 |
-| 解析复杂度 | 高 | 低 |
-| 数据大小 | 小 | 稍大 |
-| 标准化程度 | 低 | 高 |
-| 实现复杂度 | 高 | 低 |
-| 安全性 | 一般 | 优秀(支持认证) |
+| 特性 | JSON协议 | 二进制协议 |
+|------|----------|------------|
+| 开发难度 | 低 | 高 |
+| 调试难度 | 低 | 高 |
+| 可读性 | 优秀 | 差 |
+| 数据大小 | 中等 | 小(节省30-50%) |
+| 解析性能 | 中等 | 高 |
+| 学习成本 | 低 | 高 |
-### 4.3 适用场景
-
-- ✅ **开发调试**:JSON 格式便于查看和调试
-- ✅ **快速集成**:标准 JSON 格式,集成简单
-- ✅ **协议扩展**:可以轻松添加新字段
-- ✅ **多语言支持**:JSON 格式支持所有主流语言
-- ✅ **云平台对接**:与主流 IoT 云平台格式兼容
-- ✅ **安全要求**:支持设备认证和访问控制
-
-## 5. 最佳实践
-
-### 5.1 认证最佳实践
-
-1. **连接即认证**:设备连接后立即进行认证
-2. **重连机制**:连接断开后重新认证
-3. **错误重试**:认证失败时适当重试
-4. **安全传输**:使用 TLS 加密传输敏感信息
-
-### 5.2 消息设计
-
-1. **保持简洁**:避免过深的嵌套结构
-2. **字段命名**:使用驼峰命名法,保持一致性
-3. **数据类型**:使用合适的数据类型,避免字符串表示数字
-4. **时间戳**:统一使用毫秒级时间戳
-
-### 5.3 错误处理
-
-1. **参数验证**:确保必要字段存在且有效
-2. **异常捕获**:正确处理编码解码异常
-3. **日志记录**:记录详细的调试信息
-4. **认证失败**:认证失败时及时关闭连接
-
-### 5.4 性能优化
-
-1. **批量上报**:可以在 params 中包含多个数据点
-2. **连接复用**:保持 TCP 连接,避免频繁建立连接
-3. **消息缓存**:客户端可以缓存消息,批量发送
-4. **心跳间隔**:合理设置心跳间隔,避免过于频繁
-
-## 6. 配置说明
-
-### 6.1 启用 JSON 协议
-
-在配置文件中设置:
-
-```yaml
-yudao:
- iot:
- gateway:
- protocol:
- tcp:
- enabled: true
- port: 8091
- default-protocol: "JSON" # 使用 JSON 协议
-```
-
-### 6.2 认证配置
-
-```yaml
-yudao:
- iot:
- gateway:
- token:
- secret: "your-secret-key" # JWT 密钥
- expiration: "24h" # Token 过期时间
-```
-
-## 7. 调试和监控
-
-### 7.1 日志级别
-
-```yaml
-logging:
- level:
- cn.iocoder.yudao.module.iot.gateway.protocol.tcp: DEBUG
-```
-
-### 7.2 调试信息
-
-编解码器会输出详细的调试日志:
-
-- 认证过程:显示认证请求和响应
-- 编码成功:显示方法、长度、内容
-- 解码过程:显示原始数据、解析结果
-- 错误信息:详细的异常堆栈
-
-### 7.3 监控指标
-
-- 认证成功率
-- 消息处理数量
-- 编解码成功率
-- 处理延迟
-- 错误率
-- 在线设备数量
-
-## 8. 安全考虑
-
-### 8.1 认证安全
-
-1. **密码强度**:使用强密码策略
-2. **Token 过期**:设置合理的 Token 过期时间
-3. **连接限制**:限制单个设备的并发连接数
-4. **IP 白名单**:可选的 IP 访问控制
-
-### 8.2 传输安全
-
-1. **TLS 加密**:使用 TLS 1.2+ 加密传输
-2. **证书验证**:验证服务器证书
-3. **密钥管理**:安全存储和管理密钥
-
-### 8.3 数据安全
-
-1. **敏感信息**:不在日志中记录密码等敏感信息
-2. **数据验证**:验证所有输入数据
-3. **访问控制**:基于 Token 的访问控制
-
-这样就完成了 TCP JSON 格式协议的完整说明,包括认证流程的详细说明,与实际代码实现完全一致。
+**推荐场景**:
+- ✅ **开发调试阶段**:调试友好,开发效率高
+- ✅ **快速原型开发**:实现简单,快速迭代
+- ✅ **多语言集成**:广泛的语言支持
+- ❌ **高频数据传输**:建议使用二进制协议
+- ❌ **带宽受限环境**:建议使用二进制协议
\ No newline at end of file