diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java index 8279ca2471..0bf0e63e93 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java @@ -4,10 +4,9 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; import io.vertx.core.buffer.Buffer; -import lombok.AllArgsConstructor; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,9 +18,9 @@ import java.nio.charset.StandardCharsets; * 二进制协议格式(所有数值使用大端序): * *
- * +--------+--------+--------+--------+--------+--------+--------+--------+
- * | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4 字节) |
- * +--------+--------+--------+--------+--------+--------+--------+--------+
+ * +--------+--------+--------+---------------------------+--------+--------+
+ * | 魔术字 | 版本号 | 消息类型| 消息长度(4 字节) |
+ * +--------+--------+--------+---------------------------+--------+--------+
* | 消息 ID 长度(2 字节) | 消息 ID (变长字符串) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* | 方法名长度(2 字节) | 方法名(变长字符串) |
@@ -44,8 +43,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "TCP_BINARY";
- // ==================== 协议常量 ====================
-
/**
* 协议魔术字,用于协议识别
*/
@@ -56,27 +53,20 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private static final byte PROTOCOL_VERSION = (byte) 0x01;
- // TODO @haohao:这个要不直接静态枚举,不用 MessageType
/**
- * 消息类型常量
+ * 请求消息类型
*/
- public static class MessageType {
-
- /**
- * 请求消息
- */
- public static final byte REQUEST = 0x01;
- /**
- * 响应消息
- */
- public static final byte RESPONSE = 0x02;
-
- }
+ private static final byte REQUEST = (byte) 0x01;
/**
- * 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息标志 + 消息长度)
+ * 响应消息类型
*/
- private static final int HEADER_FIXED_LENGTH = 8;
+ private static final byte RESPONSE = (byte) 0x02;
+
+ /**
+ * 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息长度)
+ */
+ private static final int HEADER_FIXED_LENGTH = 7;
/**
* 最小消息长度(头部 + 消息ID长度 + 方法名长度)
@@ -97,7 +87,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
byte messageType = determineMessageType(message);
// 2. 构建消息体
byte[] bodyData = buildMessageBody(message, messageType);
- // 3. 构建完整消息(不包含deviceId,由连接上下文管理)
+ // 3. 构建完整消息
return buildCompleteMessage(message, messageType, bodyData);
} catch (Exception e) {
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
@@ -111,30 +101,59 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
try {
Buffer buffer = Buffer.buffer(bytes);
- // 1. 解析协议头部
- ProtocolHeader header = parseProtocolHeader(buffer);
- // 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
- return parseMessageContent(buffer, header);
+ // 解析协议头部和消息内容
+ int index = 0;
+ // 1. 验证魔术字
+ byte magic = buffer.getByte(index++);
+ Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
+
+ // 2. 验证版本号
+ byte version = buffer.getByte(index++);
+ Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
+
+ // 3. 读取消息类型
+ byte messageType = buffer.getByte(index++);
+ // 直接验证消息类型,无需抽取方法
+ Assert.isTrue(messageType == REQUEST || messageType == RESPONSE,
+ "无效的消息类型: " + messageType);
+
+ // 4. 读取消息长度
+ int messageLength = buffer.getInt(index);
+ index += 4;
+ Assert.isTrue(messageLength == buffer.length(),
+ "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
+
+ // 5. 读取消息 ID
+ short messageIdLength = buffer.getShort(index);
+ index += 2;
+ String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
+ index += messageIdLength;
+
+ // 6. 读取方法名
+ short methodLength = buffer.getShort(index);
+ index += 2;
+ String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
+ index += methodLength;
+
+ // 7. 解析消息体
+ return parseMessageBody(buffer, index, messageType, messageId, method);
} catch (Exception e) {
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
}
}
- // ==================== 编码相关方法 ====================
-
/**
* 确定消息类型
* 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息
*/
private byte determineMessageType(IotDeviceMessage message) {
// 判断是否为响应消息:有响应码或响应消息时为响应
- // TODO @haohao:感觉只判断 code 更稳妥点?msg 有可能空。。。
- if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
- return MessageType.RESPONSE;
+ if (message.getCode() != null) {
+ return RESPONSE;
}
// 默认为请求消息
- return MessageType.REQUEST;
+ return REQUEST;
}
/**
@@ -142,12 +161,12 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
Buffer bodyBuffer = Buffer.buffer();
- if (messageType == MessageType.RESPONSE) {
+ if (messageType == RESPONSE) {
// code
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
// msg
String msg = message.getMsg() != null ? message.getMsg() : "";
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+ byte[] msgBytes = StrUtil.utf8Bytes(msg);
bodyBuffer.appendShort((short) msgBytes.length);
bodyBuffer.appendBytes(msgBytes);
// data
@@ -155,11 +174,9 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
}
} else {
- // params
- // TODO @haohao:请求是不是只处理 message.getParams() 哈?
- Object payload = message.getParams() != null ? message.getParams() : message.getData();
- if (payload != null) {
- bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
+ // 请求消息只处理 params 参数
+ if (message.getParams() != null) {
+ bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getParams()));
}
}
return bodyBuffer.getBytes();
@@ -174,20 +191,17 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
buffer.appendByte(MAGIC_NUMBER);
buffer.appendByte(PROTOCOL_VERSION);
buffer.appendByte(messageType);
- buffer.appendByte((byte) 0x00); // 消息标志,预留字段 TODO @haohao:这个标识的作用是啥呀?
- // 2. 预留消息长度位置(在 6. 更新消息长度)
+ // 2. 预留消息长度位置(在 5. 更新消息长度)
int lengthPosition = buffer.length();
buffer.appendInt(0);
// 3. 写入消息 ID
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
- // TODO @haohao:复用 IotDeviceMessageUtils 的 generateMessageId 哇?
- : generateMessageId(message.getMethod());
- // TODO @haohao:StrUtil.utf8Bytes()
- byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
+ : IotDeviceMessageUtils.generateMessageId();
+ byte[] messageIdBytes = StrUtil.utf8Bytes(messageId);
buffer.appendShort((short) messageIdBytes.length);
buffer.appendBytes(messageIdBytes);
// 4. 写入方法名
- byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
+ byte[] methodBytes = StrUtil.utf8Bytes(message.getMethod());
buffer.appendShort((short) methodBytes.length);
buffer.appendBytes(methodBytes);
// 5. 写入消息体
@@ -197,66 +211,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
return buffer.getBytes();
}
- /**
- * 生成消息 ID
- */
- private String generateMessageId(String method) {
- return method + "_" + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
- }
-
- // ==================== 解码相关方法 ====================
-
- // TODO @haohao:是不是把 parseProtocolHeader、parseMessageContent 合并?
- /**
- * 解析协议头部
- */
- private ProtocolHeader parseProtocolHeader(Buffer buffer) {
- int index = 0;
- // 1. 验证魔术字
- byte magic = buffer.getByte(index++);
- Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
- // 2. 验证版本号
- byte version = buffer.getByte(index++);
- Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
-
- // 3. 读取消息类型
- byte messageType = buffer.getByte(index++);
- Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
- // 4. 读取消息标志(暂时跳过)
- byte messageFlags = buffer.getByte(index++);
-
- // 5. 读取消息长度
- int messageLength = buffer.getInt(index);
- index += 4;
-
- Assert.isTrue(messageLength == buffer.length(),
- "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
-
- return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
- }
-
- /**
- * 解析消息内容
- */
- private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
- int index = header.getNextIndex();
-
- // 1. 读取消息 ID
- short messageIdLength = buffer.getShort(index);
- index += 2;
- String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
- index += messageIdLength;
-
- // 2. 读取方法名
- short methodLength = buffer.getShort(index);
- index += 2;
- String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
- index += methodLength;
-
- // 3. 解析消息体
- return parseMessageBody(buffer, index, header.getMessageType(), messageId, method);
- }
-
/**
* 解析消息体
*/
@@ -267,11 +221,11 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
return IotDeviceMessage.of(messageId, method, null, null, null, null);
}
- if (messageType == MessageType.RESPONSE) {
+ if (messageType == RESPONSE) {
// 响应消息:解析 code + msg + data
return parseResponseMessage(buffer, startIndex, messageId, method);
} else {
- // 请求消息:解析 payload(可能是 params 或 data)
+ // 请求消息:解析 payload
Object payload = parseJsonData(buffer, startIndex, buffer.length());
return IotDeviceMessage.of(messageId, method, payload, null, null, null);
}
@@ -303,7 +257,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
}
/**
- * 解析JSON数据
+ * 解析 JSON 数据
*/
private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
if (startIndex >= endIndex) {
@@ -318,34 +272,14 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
}
}
- // ==================== 辅助方法 ====================
-
- // TODO @haohao:这个貌似只用一次,可以考虑不抽小方法哈;
/**
- * 验证消息类型是否有效
+ * 快速检测是否为二进制格式
+ *
+ * @param data 数据
+ * @return 是否为二进制格式
*/
- private boolean isValidMessageType(byte messageType) {
- return messageType == MessageType.REQUEST || messageType == MessageType.RESPONSE;
+ public static boolean isBinaryFormatQuick(byte[] data) {
+ return data != null && data.length >= 1 && data[0] == MAGIC_NUMBER;
}
- // ==================== 内部类 ====================
-
- /**
- * 协议头部信息
- */
- @Data
- @AllArgsConstructor
- private static class ProtocolHeader {
-
- private byte magic;
- private byte version;
- private byte messageType;
- private byte messageFlags;
- private int messageLength;
- /**
- * 指向消息内容开始位置
- */
- private int nextIndex;
-
- }
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
index 520861e51e..8f5b638b53 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java
@@ -28,46 +28,33 @@ public class IotTcpConnectionManager {
private final Map connectionMap = new ConcurrentHashMap<>();
/**
- * 设备 ID -> NetSocket 的映射(用于快速查找)
+ * 设备 ID -> NetSocket 的映射
*/
private final Map deviceSocketMap = new ConcurrentHashMap<>();
- /**
- * NetSocket -> 设备 ID 的映射(用于连接断开时清理)
- */
- private final Map socketDeviceMap = new ConcurrentHashMap<>();
-
/**
* 注册设备连接(包含认证信息)
*
- * @param socket TCP 连接
- * @param deviceId 设备 ID
- * @param authInfo 认证信息
+ * @param socket TCP 连接
+ * @param deviceId 设备 ID
+ * @param connectionInfo 连接信息
*/
- public void registerConnection(NetSocket socket, Long deviceId, AuthInfo authInfo) {
+ public void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) {
// 如果设备已有其他连接,先清理旧连接
NetSocket oldSocket = deviceSocketMap.get(deviceId);
if (oldSocket != null && oldSocket != socket) {
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
deviceId, oldSocket.remoteAddress());
oldSocket.close();
- // 清理所有相关映射
+ // 清理旧连接的映射
connectionMap.remove(oldSocket);
- socketDeviceMap.remove(oldSocket);
}
- // 注册新连接 - 更新所有映射关系
- ConnectionInfo connectionInfo = new ConnectionInfo()
- .setDeviceId(deviceId)
- .setAuthInfo(authInfo)
- .setAuthenticated(true);
connectionMap.put(socket, connectionInfo);
deviceSocketMap.put(deviceId, socket);
- // TODO @haohao:socketDeviceMap 和 connectionMap 会重复哇?connectionMap.get(socket).getDeviceId
- socketDeviceMap.put(socket, deviceId);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
- deviceId, socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
+ deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
/**
@@ -77,29 +64,14 @@ public class IotTcpConnectionManager {
*/
public void unregisterConnection(NetSocket socket) {
ConnectionInfo connectionInfo = connectionMap.remove(socket);
- Long deviceId = socketDeviceMap.remove(socket);
- if (connectionInfo != null && deviceId != null) {
+ if (connectionInfo != null) {
+ Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
}
}
- // TODO @haohao:用不到,要不暂时清理哈。
- /**
- * 注销设备连接(通过设备 ID)
- *
- * @param deviceId 设备 ID
- */
- public void unregisterConnection(Long deviceId) {
- NetSocket socket = deviceSocketMap.remove(deviceId);
- if (socket != null) {
- connectionMap.remove(socket);
- socketDeviceMap.remove(socket);
- log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress());
- }
- }
-
/**
* 检查连接是否已认证
*/
@@ -116,11 +88,10 @@ public class IotTcpConnectionManager {
}
/**
- * 获取连接的认证信息
+ * 获取连接信息
*/
- public AuthInfo getAuthInfo(NetSocket socket) {
- ConnectionInfo info = connectionMap.get(socket);
- return info != null ? info.getAuthInfo() : null;
+ public ConnectionInfo getConnectionInfo(NetSocket socket) {
+ return connectionMap.get(socket);
}
/**
@@ -159,30 +130,34 @@ public class IotTcpConnectionManager {
}
}
- // TODO @haohao:ConnectionInfo 和 AuthInfo 是不是可以融合哈?
-
/**
- * 连接信息
+ * 连接信息(包含认证信息)
*/
@Data
public static class ConnectionInfo {
-
- private Long deviceId;
- private AuthInfo authInfo;
- private boolean authenticated;
-
- }
-
- /**
- * 认证信息
- */
- @Data
- public static class AuthInfo {
-
+ /**
+ * 设备 ID
+ */
private Long deviceId;
+ /**
+ * 产品 Key
+ */
private String productKey;
+ /**
+ * 设备名称
+ */
private String deviceName;
+ /**
+ * 客户端 ID
+ */
private String clientId;
-
+ /**
+ * 消息编解码类型(认证后确定)
+ */
+ private String codecType;
+ /**
+ * 是否已认证
+ */
+ private boolean authenticated;
}
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
index 627daad680..d290d99468 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java
@@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
@@ -21,12 +21,8 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessa
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
-import lombok.AllArgsConstructor;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import java.nio.charset.StandardCharsets;
-
/**
* TCP 上行消息处理器
*
@@ -77,31 +73,55 @@ public class IotTcpUpstreamHandler implements Handler {
});
// 设置消息处理器
- socket.handler(buffer -> processMessage(clientId, buffer, socket));
+ socket.handler(buffer -> {
+ try {
+ processMessage(clientId, buffer, socket);
+ } catch (Exception e) {
+ log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
+ clientId, socket.remoteAddress(), e.getMessage());
+ cleanupConnection(socket);
+ socket.close();
+ }
+ });
}
/**
* 处理消息
+ *
+ * @param clientId 客户端 ID
+ * @param buffer 消息
+ * @param socket 网络连接
+ * @throws Exception 消息解码失败时抛出异常
*/
- private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
- try {
- // 1.1 数据包基础检查
- if (buffer.length() == 0) {
- return;
- }
- // 1.2 解码消息
- MessageInfo messageInfo = decodeMessage(buffer);
- if (messageInfo == null) {
- return;
- }
+ private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
+ // 1. 基础检查
+ if (buffer == null || buffer.length() == 0) {
+ return;
+ }
- // 2. 根据消息类型路由处理
- if (isAuthRequest(messageInfo.message)) {
+ // 2. 获取消息格式类型
+ String codecType = getMessageCodecType(buffer, socket);
+
+ // 3. 解码消息
+ IotDeviceMessage message;
+ try {
+ message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
+ if (message == null) {
+ throw new Exception("解码后消息为空");
+ }
+ } catch (Exception e) {
+ // 消息格式错误时抛出异常,由上层处理连接断开
+ throw new Exception("消息解码失败: " + e.getMessage(), e);
+ }
+
+ // 4. 根据消息类型路由处理
+ try {
+ if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
- handleAuthenticationRequest(clientId, messageInfo, socket);
+ handleAuthenticationRequest(clientId, message, codecType, socket);
} else {
// 业务消息
- handleBusinessRequest(clientId, messageInfo, socket);
+ handleBusinessRequest(clientId, message, codecType, socket);
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
@@ -110,226 +130,158 @@ public class IotTcpUpstreamHandler implements Handler {
/**
* 处理认证请求
+ *
+ * @param clientId 客户端 ID
+ * @param message 消息信息
+ * @param codecType 消息编解码类型
+ * @param socket 网络连接
*/
- private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
+ private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType,
+ NetSocket socket) {
try {
// 1.1 解析认证参数
- IotDeviceMessage message = messageInfo.message;
- AuthParams authParams = parseAuthParams(message.getParams());
+ IotDeviceAuthReqDTO authParams = JsonUtils.parseObject(message.getParams().toString(),
+ IotDeviceAuthReqDTO.class);
if (authParams == null) {
- sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
+ sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType);
return;
}
// 1.2 执行认证
- if (!authenticateDevice(authParams)) {
+ if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
- clientId, authParams.username);
- sendError(socket, message.getRequestId(), "认证失败", messageInfo.codecType);
+ clientId, authParams.getUsername());
+ sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType);
return;
}
// 2.1 解析设备信息
- IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
+ IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
- sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
+ sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType);
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
- sendError(socket, message.getRequestId(), "设备不存在", messageInfo.codecType);
+ sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType);
return;
}
- // 3. 注册连接并发送成功响应
- registerConnection(socket, device, deviceInfo, authParams.clientId);
- sendOnlineMessage(deviceInfo);
- sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
+ // 3.1 注册连接
+ registerConnection(socket, device, clientId, codecType);
+ // 3.2 发送上线消息
+ sendOnlineMessage(device);
+ // 3.3 发送成功响应
+ sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType);
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
- device.getId(), deviceInfo.getDeviceName());
+ device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
- sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
+ sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType);
}
}
/**
* 处理业务请求
+ *
+ * @param clientId 客户端 ID
+ * @param message 消息信息
+ * @param codecType 消息编解码类型
+ * @param socket 网络连接
*/
- private void handleBusinessRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
+ private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) {
try {
// 1. 检查认证状态
if (connectionManager.isNotAuthenticated(socket)) {
log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
- sendError(socket, messageInfo.message.getRequestId(), "请先进行认证", messageInfo.codecType);
+ sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 2. 获取认证信息并处理业务消息
- IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
- processBusinessMessage(clientId, messageInfo.message, authInfo);
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+
+ // 3. 发送消息到消息总线
+ deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
+ connectionInfo.getDeviceName(), serverId);
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
}
}
- // TODO @haohao:processBusinessMessage 这个小方法,直接融合到 handleBusinessRequest 里?读起来更聚集点
/**
- * 处理业务消息
- */
- private void processBusinessMessage(String clientId, IotDeviceMessage message,
- IotTcpConnectionManager.AuthInfo authInfo) {
- try {
- message.setDeviceId(authInfo.getDeviceId());
- message.setServerId(serverId);
- // 发送到消息总线
- deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
- authInfo.getDeviceName(), serverId);
- } catch (Exception e) {
- log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
- clientId, message.getId(), e);
- }
- }
-
- /**
- * 解码消息
+ * 获取消息编解码类型
*
* @param buffer 消息
+ * @param socket 网络连接
+ * @return 消息编解码类型
*/
- private MessageInfo decodeMessage(Buffer buffer) {
- if (buffer == null || buffer.length() == 0) {
- return null;
- }
- // 1. 快速检测消息格式类型
- // TODO @haohao:是不是进一步优化?socket 建立认证后,那条消息已经定义了所有消息的格式哈?
- String codecType = detectMessageFormat(buffer);
- try {
- // 2. 使用检测到的格式进行解码
- IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
- if (message == null) {
- return null;
- }
- return new MessageInfo(message, codecType);
- } catch (Exception e) {
- log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
- codecType, buffer.length(), e.getMessage());
- // TODO @haohao:一般消息格式不对,应该抛出异常,断开连接居多?
- return null;
- }
- }
-
- /**
- * 检测消息格式类型
- * 优化性能:避免不必要的字符串转换
- */
- private String detectMessageFormat(Buffer buffer) {
- // TODO @haohao:是不是 IotTcpBinaryDeviceMessageCodec 提供一个 isBinaryFormat 方法哈?
- // 默认使用 JSON
- if (buffer.length() == 0) {
- return CODEC_TYPE_JSON;
+ private String getMessageCodecType(Buffer buffer, NetSocket socket) {
+ // 1. 如果已认证,优先使用缓存的编解码类型
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+ if (connectionInfo != null && connectionInfo.isAuthenticated() &&
+ StrUtil.isNotBlank(connectionInfo.getCodecType())) {
+ return connectionInfo.getCodecType();
}
- // 1. 优先检测二进制格式(检查魔术字节 0x7E)
- if (isBinaryFormat(buffer)) {
- return CODEC_TYPE_BINARY;
- }
-
- // 2. 检测 JSON 格式(检查前几个有效字符)
- // TODO @haohao:这个检测去掉?直接 return CODEC_TYPE_JSON 更简洁一点。
- if (isJsonFormat(buffer)) {
- return CODEC_TYPE_JSON;
- }
-
- // 3. 默认尝试 JSON 格式
- return CODEC_TYPE_JSON;
- }
-
- /**
- * 检测二进制格式
- * 通过检查魔术字节快速识别,避免完整字符串转换
- */
- private boolean isBinaryFormat(Buffer buffer) {
- // 二进制协议最小长度检查
- if (buffer.length() < 8) {
- return false;
- }
-
- try {
- // 检查魔术字节 0x7E(二进制协议的第一个字节)
- byte firstByte = buffer.getByte(0);
- return firstByte == (byte) 0x7E;
- } catch (Exception e) {
- return false;
- }
- }
-
- /**
- * 检测 JSON 格式
- * 只检查前几个有效字符,避免完整字符串转换
- */
- private boolean isJsonFormat(Buffer buffer) {
- try {
- // 检查前 64 个字节或整个缓冲区(取较小值)
- int checkLength = Math.min(buffer.length(), 64);
- String prefix = buffer.getString(0, checkLength, StandardCharsets.UTF_8.name());
-
- if (StrUtil.isBlank(prefix)) {
- return false;
- }
-
- String trimmed = prefix.trim();
- // JSON 格式必须以 { 或 [ 开头
- return trimmed.startsWith("{") || trimmed.startsWith("[");
-
- } catch (Exception e) {
- return false;
- }
+ // 2. 未认证时检测消息格式类型
+ return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
+ : CODEC_TYPE_JSON;
}
/**
* 注册连接信息
+ *
+ * @param socket 网络连接
+ * @param device 设备
+ * @param clientId 客户端 ID
+ * @param codecType 消息编解码类型
*/
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
- IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
- // TODO @haohao:AuthInfo 的创建,放在 connectionManager 里构建貌似会更收敛一点?
- // 创建认证信息
- IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
+ String clientId, String codecType) {
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
- .setProductKey(deviceInfo.getProductKey())
- .setDeviceName(deviceInfo.getDeviceName())
- .setClientId(clientId);
+ .setProductKey(device.getProductKey())
+ .setDeviceName(device.getDeviceName())
+ .setClientId(clientId)
+ .setCodecType(codecType)
+ .setAuthenticated(true);
// 注册连接
- connectionManager.registerConnection(socket, device.getId(), authInfo);
+ connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
+ *
+ * @param device 设备信息
*/
- private void sendOnlineMessage(IotDeviceAuthUtils.DeviceInfo deviceInfo) {
+ private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
- deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(),
- deviceInfo.getDeviceName(), serverId);
+ deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
+ device.getDeviceName(), serverId);
} catch (Exception e) {
- log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", deviceInfo.getDeviceName(), e);
+ log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
}
}
/**
* 清理连接
+ *
+ * @param socket 网络连接
*/
private void cleanupConnection(NetSocket socket) {
try {
- // 发送离线消息(如果已认证)
- IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
- if (authInfo != null) {
+ // 1. 发送离线消息(如果已认证)
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+ if (connectionInfo != null) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
- deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(),
- authInfo.getDeviceName(), serverId);
+ deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
+ connectionInfo.getDeviceName(), serverId);
}
- // 注销连接
+ // 2. 注销连接
connectionManager.unregisterConnection(socket);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败]", e);
@@ -338,6 +290,12 @@ public class IotTcpUpstreamHandler implements Handler {
/**
* 发送响应消息
+ *
+ * @param socket 网络连接
+ * @param success 是否成功
+ * @param message 消息
+ * @param requestId 请求 ID
+ * @param codecType 消息编解码类型
*/
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
try {
@@ -346,8 +304,9 @@ public class IotTcpUpstreamHandler implements Handler {
.put("message", message)
.build();
+ int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
- success ? 0 : 401, message);
+ code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
@@ -357,94 +316,47 @@ public class IotTcpUpstreamHandler implements Handler {
}
}
- // ==================== 辅助方法 ====================
-
/**
- * 判断是否为认证请求
+ * 验证设备认证信息
+ *
+ * @param authParams 认证参数
+ * @return 是否认证成功
*/
- private boolean isAuthRequest(IotDeviceMessage message) {
- return AUTH_METHOD.equals(message.getMethod());
- }
-
- /**
- * 解析认证参数
- */
- private AuthParams parseAuthParams(Object params) {
- if (params == null) {
- return null;
- }
- try {
- JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
- : JSONUtil.parseObj(params.toString());
- String clientId = paramsJson.getStr("clientId");
- String username = paramsJson.getStr("username");
- String password = paramsJson.getStr("password");
- return StrUtil.hasBlank(clientId, username, password) ? null
- : new AuthParams(clientId, username, password);
- } catch (Exception e) {
- log.warn("[parseAuthParams][解析认证参数失败]", e);
- return null;
- }
- }
-
- /**
- * 认证设备
- */
- private boolean authenticateDevice(AuthParams authParams) {
+ private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
- .setClientId(authParams.clientId)
- .setUsername(authParams.username)
- .setPassword(authParams.password));
- return result.isSuccess() && Boolean.TRUE.equals(result.getData());
+ .setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
+ .setPassword(authParams.getPassword()));
+ result.checkError();
+ return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
- log.error("[authenticateDevice][设备认证异常,username: {}]", authParams.username, e);
+ log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e);
return false;
}
}
- // TODO @haohao:改成 sendErrorResponse sendSuccessResponse 更清晰点?
-
/**
* 发送错误响应
+ *
+ * @param socket 网络连接
+ * @param requestId 请求 ID
+ * @param errorMessage 错误消息
+ * @param codecType 消息编解码类型
*/
- private void sendError(NetSocket socket, String requestId, String errorMessage, String codecType) {
+ private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) {
sendResponse(socket, false, errorMessage, requestId, codecType);
}
/**
* 发送成功响应
+ *
+ * @param socket 网络连接
+ * @param requestId 请求 ID
+ * @param message 消息
+ * @param codecType 消息编解码类型
*/
- private void sendSuccess(NetSocket socket, String requestId, String message, String codecType) {
+ private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) {
sendResponse(socket, true, message, requestId, codecType);
}
- // ==================== 内部类 ====================
-
- // TODO @haohao:IotDeviceAuthReqDTO 复用这个?
- /**
- * 认证参数
- */
- @Data
- @AllArgsConstructor
- private static class AuthParams {
-
- private final String clientId;
- private final String username;
- private final String password;
-
- }
-
- /**
- * 消息信息
- */
- @Data
- @AllArgsConstructor
- private static class MessageInfo {
-
- private final IotDeviceMessage message;
-
- private final String codecType;
-
- }
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
index d85d347f70..d6b2b3fdb5 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/tcp-binary-packet-examples.md
@@ -9,7 +9,7 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
- **高效传输**:完全二进制格式,减少数据传输量
- **版本控制**:内置协议版本号,支持协议升级
- **类型安全**:明确的消息类型标识
-- **扩展性**:预留标志位,支持未来功能扩展
+- **简洁设计**:去除冗余字段,协议更加精简
- **兼容性**:与现有 `IotDeviceMessage` 接口完全兼容
## 2. 协议格式
@@ -17,9 +17,9 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
### 2.1 整体结构
```
-+--------+--------+--------+--------+--------+--------+--------+--------+
-| 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
-+--------+--------+--------+--------+--------+--------+--------+--------+
++--------+--------+--------+---------------------------+--------+--------+
+| 魔术字 | 版本号 | 消息类型| 消息长度(4字节) |
++--------+--------+--------+---------------------------+--------+--------+
| 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
+--------+--------+--------+--------+--------+--------+--------+--------+
| 方法名长度(2字节) | 方法名(变长字符串) |
@@ -35,7 +35,6 @@ TCP 二进制协议是一种高效的自定义协议格式,采用紧凑的二
| 魔术字 | 1字节 | byte | `0x7E` - 协议识别标识,用于数据同步 |
| 版本号 | 1字节 | byte | `0x01` - 协议版本号,支持版本控制 |
| 消息类型 | 1字节 | byte | `0x01`=请求, `0x02`=响应 |
-| 消息标志 | 1字节 | byte | 预留字段,用于未来扩展 |
| 消息长度 | 4字节 | int | 整个消息的总长度(包含头部) |
| 消息 ID 长度 | 2字节 | short | 消息 ID 字符串的字节长度 |
| 消息 ID | 变长 | string | 消息唯一标识符(UTF-8编码) |
@@ -53,14 +52,12 @@ private static final byte MAGIC_NUMBER = (byte) 0x7E;
private static final byte PROTOCOL_VERSION = (byte) 0x01;
// 消息类型
-public static class MessageType {
- public static final byte REQUEST = 0x01; // 请求消息
- public static final byte RESPONSE = 0x02; // 响应消息
-}
+private static final byte REQUEST = (byte) 0x01; // 请求消息
+private static final byte RESPONSE = (byte) 0x02; // 响应消息
// 协议长度
-private static final int HEADER_FIXED_LENGTH = 8; // 固定头部长度
-private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
+private static final int HEADER_FIXED_LENGTH = 7; // 固定头部长度
+private static final int MIN_MESSAGE_LENGTH = 11; // 最小消息长度
```
## 3. 消息类型和格式
@@ -86,8 +83,7 @@ private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
7E // 魔术字 (0x7E)
01 // 版本号 (0x01)
01 // 消息类型 (REQUEST)
-00 // 消息标志 (预留)
-00 00 00 8A // 消息长度 (138字节)
+00 00 00 89 // 消息长度 (137字节)
00 19 // 消息 ID 长度 (25字节)
61 75 74 68 5F 31 37 30 34 30 // 消息 ID: "auth_1704067200000_123"
36 37 32 30 30 30 30 30 5F 31
@@ -144,8 +140,7 @@ private static final int MIN_MESSAGE_LENGTH = 12; // 最小消息长度
7E // 魔术字 (0x7E)
01 // 版本号 (0x01)
02 // 消息类型 (RESPONSE)
-00 // 消息标志 (预留)
-00 00 00 A5 // 消息长度 (165字节)
+00 00 00 A4 // 消息长度 (164字节)
00 22 // 消息 ID 长度 (34字节)
61 75 74 68 5F 72 65 73 70 6F // 消息 ID: "auth_response_1704067200000_123"
6E 73 65 5F 31 37 30 34 30 36
@@ -175,19 +170,19 @@ public static final String TYPE = "TCP_BINARY";
- **数据紧凑**:二进制格式,相比 JSON 减少 30-50% 的数据量
- **解析高效**:直接二进制操作,减少字符串转换开销
- **类型安全**:明确的消息类型和字段定义
-- **扩展性强**:预留标志位支持未来功能扩展
+- **设计简洁**:去除冗余字段,协议更加精简高效
- **版本控制**:内置版本号支持协议升级
## 6. 与 JSON 协议对比
-| 特性 | 二进制协议 | JSON协议 |
-|------|------------|----------|
-| 数据大小 | 小(节省30-50%) | 大 |
-| 解析性能 | 高 | 中等 |
-| 网络开销 | 低 | 高 |
-| 可读性 | 差 | 优秀 |
-| 调试难度 | 高 | 低 |
-| 扩展性 | 良好(有预留位) | 优秀 |
+| 特性 | 二进制协议 | JSON协议 |
+|------|-------------|--------|
+| 数据大小 | 小(节省30-50%) | 大 |
+| 解析性能 | 高 | 中等 |
+| 网络开销 | 低 | 高 |
+| 可读性 | 差 | 优秀 |
+| 调试难度 | 高 | 低 |
+| 扩展性 | 良好 | 优秀 |
**推荐场景**:
- ✅ **高频数据传输**:传感器数据实时上报