fix:【IoT 物联网】code review tcp 实现

This commit is contained in:
YunaiV
2025-07-23 19:17:33 +08:00
parent bd8052f56b
commit f70f578ac5
7 changed files with 133 additions and 101 deletions

View File

@@ -10,11 +10,12 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
// TODO @haohao设备地址(变长) 是不是非必要哈?因为认证后,不需要每次都带呀。
/**
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器
*
* 使用自定义二进制协议格式:
* 包头(4字节) | 地址长度(2字节) | 设备地址(变长) | 功能码(2字节) | 消息序号(2字节) | 包体数据(变长)
* 包头(4 字节) | 地址长度(2 字节) | 设备地址(变长) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
*
* @author 芋道源码
*/
@@ -27,6 +28,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
public static final String TYPE = "TCP_BINARY";
// TODO @haohao这个注释不太对。
// ==================== 常量定义 ====================
@Override
@@ -67,10 +69,11 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
TcpDataPackage dataPackage = decodeTcpDataPackage(Buffer.buffer(bytes));
// 2. 根据功能码确定方法
// TODO @haohao会不会有事件上报哈。
String method = (dataPackage.getCode() == TcpDataPackage.CODE_HEARTBEAT) ?
MessageMethod.STATE_ONLINE : MessageMethod.PROPERTY_POST;
// 3. 解析负载数据和请求ID
// 3. 解析负载数据和请求 ID
PayloadInfo payloadInfo = parsePayloadInfo(dataPackage.getPayload());
// 4. 构建 IoT 设备消息(设置完整的必要参数)
@@ -78,13 +81,16 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
payloadInfo.getRequestId(), method, payloadInfo.getParams());
// 5. 设置设备相关信息
// TODO @haohaoserverId 不是这里解析的哈。
Long deviceId = parseDeviceId(dataPackage.getAddr());
message.setDeviceId(deviceId);
// 6. 设置TCP协议相关信息
// 6. 设置 TCP 协议相关信息
// TODO @haohaoserverId 不是这里解析的哈。
message.setServerId(generateServerId(dataPackage));
// 7. 设置租户IDTODO: 后续可以从设备信息中获取)
// 7. 设置租户 IDTODO: 后续可以从设备信息中获取)
// TODO @haohao租户 id 不是这里解析的哈。
// message.setTenantId(getTenantIdByDeviceId(deviceId));
if (log.isDebugEnabled()) {
@@ -104,6 +110,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
return TYPE;
}
// TODO @haohao这种简单解析中间不用空格哈。
/**
* 构建完整负载
*/
@@ -130,12 +137,10 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
return payload.toString();
}
// ==================== 编解码方法 ====================
/**
* 解析负载信息包含requestIdparams
* 解析负载信息(包含 requestIdparams
*/
private PayloadInfo parsePayloadInfo(String payload) {
if (StrUtil.isEmpty(payload)) {
@@ -143,6 +148,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
}
try {
// TODO @haohao使用 jsonUtils
JSONObject jsonObject = JSONUtil.parseObj(payload);
String requestId = jsonObject.getStr(PayloadField.REQUEST_ID);
if (StrUtil.isEmpty(requestId)) {
@@ -185,7 +191,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
* @return 服务ID
*/
private String generateServerId(TcpDataPackage dataPackage) {
// 使用协议类型 + 设备地址 + 消息序号生成唯一的服务ID
// 使用协议类型 + 设备地址 + 消息序号生成唯一的服务 ID
return String.format("tcp_%s_%d", dataPackage.getAddr(), dataPackage.getMid());
}
@@ -300,23 +306,28 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
* 消息方法常量
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
}
/**
* 负载字段名
*/
private static class PayloadField {
public static final String METHOD = "method";
public static final String PARAMS = "params";
public static final String TIMESTAMP = "timestamp";
public static final String REQUEST_ID = "requestId";
public static final String MESSAGE_ID = "msgId";
}
// ==================== TCP 数据包编解码方法 ====================
// TODO @haohaolombok 简化
/**
* 负载信息类
*/
@@ -361,11 +372,13 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
// ==================== 自定义异常 ====================
// TODO @haohao可以搞个全局的
/**
* TCP 编解码异常
*/
public static class TcpCodecException extends RuntimeException {
// TODO @haohao非必要构造方法可以去掉哈。
public TcpCodecException(String message) {
super(message);
}

View File

@@ -8,11 +8,11 @@ import org.springframework.stereotype.Component;
/**
* TCP编解码器管理器简化版
*
*
* 核心功能:
* - 自动协议检测(二进制 vs JSON
* - 统一编解码接口
* - 默认使用JSON协议
* - 默认使用 JSON 协议
*
* @author 芋道源码
*/
@@ -22,6 +22,8 @@ public class IotTcpCodecManager implements IotDeviceMessageCodec {
public static final String TYPE = "TCP";
// TODO @haohao@Resource
@Autowired
private IotTcpBinaryDeviceMessageCodec binaryCodec;
@@ -40,21 +42,22 @@ public class IotTcpCodecManager implements IotDeviceMessageCodec {
@Override
public byte[] encode(IotDeviceMessage message) {
// 默认使用JSON协议编码
// 默认使用 JSON 协议编码
return jsonCodec.encode(message);
}
// TODO @haohao要不还是不自动检测用户手动配置哈。简化一些。。。
@Override
public IotDeviceMessage decode(byte[] bytes) {
// 自动检测协议类型并解码
if (isJsonFormat(bytes)) {
if (log.isDebugEnabled()) {
log.debug("[decode][检测到JSON协议] 数据长度: {}字节", bytes.length);
log.debug("[decode][检测到 JSON 协议数据长度: {} 字节]", bytes.length);
}
return jsonCodec.decode(bytes);
} else {
if (log.isDebugEnabled()) {
log.debug("[decode][检测到二进制协议] 数据长度: {}字节", bytes.length);
log.debug("[decode][检测到二进制协议数据长度: {} 字节]", bytes.length);
}
return binaryCodec.decode(bytes);
}
@@ -63,7 +66,7 @@ public class IotTcpCodecManager implements IotDeviceMessageCodec {
// ==================== 便捷方法 ====================
/**
* 使用JSON协议编码
* 使用 JSON 协议编码
*/
public byte[] encodeJson(IotDeviceMessage message) {
return jsonCodec.encode(message);
@@ -95,42 +98,46 @@ public class IotTcpCodecManager implements IotDeviceMessageCodec {
/**
* 检测是否为JSON格式
*
*
* 检测规则:
* 1. 数据以 '{' 开头
* 2. 包含 "method" 或 "id" 字段
*/
private boolean isJsonFormat(byte[] bytes) {
// TODO @haohaoArrayUtil.isEmpty(bytes) 可以简化下
if (bytes == null || bytes.length == 0) {
return useJsonByDefault;
}
try {
// 检测JSON格式以 '{' 开头
// 检测 JSON 格式:以 '{' 开头
if (bytes[0] == '{') {
// 进一步验证是否为有效JSON
// TODO @haohao不一定按照顺序写这个可能要看下。
// 进一步验证是否为有效 JSON
String jsonStr = new String(bytes, 0, Math.min(bytes.length, 100));
return jsonStr.contains("\"method\"") || jsonStr.contains("\"id\"");
}
// 检测二进制格式:长度 >= 8 且符合二进制协议结构
if (bytes.length >= 8) {
// 读取包头(前4字节表示后续数据长度)
// 读取包头(前 4 字节表示后续数据长度)
int expectedLength = ((bytes[0] & 0xFF) << 24) |
((bytes[1] & 0xFF) << 16) |
((bytes[2] & 0xFF) << 8) |
(bytes[3] & 0xFF);
// 验证长度是否合理
// TODO @haohaoexpectedLength > 0 多余的貌似;
if (expectedLength == bytes.length - 4 && expectedLength > 0 && expectedLength < 1024 * 1024) {
return false; // 二进制格式
}
}
} catch (Exception e) {
log.warn("[isJsonFormat][协议检测异常] 使用默认协议: {}", getDefaultProtocol(), e);
log.warn("[isJsonFormat][协议检测异常使用默认协议: {}]", getDefaultProtocol(), e);
}
// 默认使用当前设置的协议类型
return useJsonByDefault;
}
}

View File

@@ -12,16 +12,16 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* TCP JSON格式 {@link IotDeviceMessage} 编解码器
*
* 采用纯JSON格式传输参考EMQXHTTP模块的数据格式
*
* TCP JSON 格式 {@link IotDeviceMessage} 编解码器
*
* 采用纯 JSON 格式传输,参考 EMQXHTTP 模块的数据格式
*
* JSON消息格式
* {
* "id": "消息ID",
* "id": "消息 ID",
* "method": "消息方法",
* "deviceId": "设备ID",
* "productKey": "产品Key",
* "deviceId": "设备 ID",
* "productKey": "产品 Key",
* "deviceName": "设备名称",
* "params": {...},
* "timestamp": 时间戳
@@ -35,6 +35,7 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "TCP_JSON";
// TODO @haohao变量不太对
// ==================== 常量定义 ====================
@Override
@@ -77,14 +78,15 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
}
try {
// 转换为JSON字符串
// 转换为 JSON 字符串
String jsonString = new String(bytes, StandardCharsets.UTF_8);
if (log.isDebugEnabled()) {
log.debug("[decode][开始解码] JSON长度: {}字节, 内容: {}", bytes.length, jsonString);
}
// 解析JSON消息
// 解析 JSON 消息
// TODO @haohaoJsonUtils
JSONObject jsonMessage = JSONUtil.parseObj(jsonString);
// 构建IoT设备消息
@@ -129,7 +131,7 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
}
/**
* 构建JSON消息
* 构建 JSON 消息
*/
private JSONObject buildJsonMessage(IotDeviceMessage message) {
JSONObject jsonMessage = new JSONObject();
@@ -189,7 +191,7 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
message.setMsg(msg);
}
// 设置服务ID基于JSON格式
// 设置服务 ID基于 JSON 格式)
message.setServerId(generateServerId(jsonMessage));
return message;
@@ -216,22 +218,26 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
StrUtil.isNotEmpty(id) ? id.substring(0, Math.min(8, id.length())) : "noId");
}
// TODO @haohao注释格式不对
/**
* 消息方法常量
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
public static final String EVENT_POST = "thing.event.post"; // 事件上报
public static final String PROPERTY_SET = "thing.property.set"; // 属性设置
public static final String PROPERTY_GET = "thing.property.get"; // 属性获取
public static final String SERVICE_INVOKE = "thing.service.invoke"; // 服务调用
}
/**
* JSON字段名参考EMQX和HTTP模块格式
*/
private static class JsonField {
public static final String ID = "id";
public static final String METHOD = "method";
public static final String DEVICE_ID = "deviceId";
@@ -241,5 +247,6 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TIMESTAMP = "timestamp";
public static final String CODE = "code";
public static final String MESSAGE = "message";
}
}

View File

@@ -30,6 +30,7 @@ public class IotTcpUpstreamProtocol {
private final IotDeviceMessageService messageService;
// TODO @haohao不用的变量可以删除
private final IotDeviceCommonApi deviceApi;
private final IotTcpCodecManager codecManager;
@@ -58,6 +59,7 @@ public class IotTcpUpstreamProtocol {
@PostConstruct
public void start() {
// TODO @haohao类似下面 62 到 75 是处理 options 的,因为中间写了注释,其实可以不用空行;然后 77 到 91 可以中间空喊去掉,更紧凑一点;
// 创建服务器选项
NetServerOptions options = new NetServerOptions()
.setPort(tcpProperties.getPort())

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.extern.slf4j.Slf4j;
@@ -24,11 +23,12 @@ public class IotTcpDownstreamHandler {
private final IotDeviceMessageService messageService;
private final IotTcpDeviceMessageCodec codec;
// TODO @haohao代码没提交全有报错。
// private final IotTcpDeviceMessageCodec codec;
public IotTcpDownstreamHandler(IotDeviceMessageService messageService) {
this.messageService = messageService;
this.codec = new IotTcpDeviceMessageCodec();
// this.codec = new IotTcpDeviceMessageCodec();
}
/**
@@ -42,7 +42,8 @@ public class IotTcpDownstreamHandler {
message.getDeviceId(), message.getMethod(), message.getId());
// 编码消息用于日志记录和验证
byte[] encodedMessage = codec.encode(message);
byte[] encodedMessage = null;
// codec.encode(message);
log.debug("[handle][消息编码成功] 设备ID: {}, 编码后长度: {} 字节",
message.getDeviceId(), encodedMessage.length);