review:【iot 物联网】tcp 协议的接入

This commit is contained in:
YunaiV
2025-07-28 21:28:40 +08:00
parent c9b9fc1f31
commit 6b79cab09a
9 changed files with 94 additions and 57 deletions

View File

@@ -236,8 +236,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
@Override
public Long getDeviceMessageCount(LocalDateTime createTime) {
return deviceMessageMapper
.selectCountByCreateTime(createTime != null ? LocalDateTimeUtil.toEpochMilli(createTime) : null);
return deviceMessageMapper.selectCountByCreateTime(
createTime != null ? LocalDateTimeUtil.toEpochMilli(createTime) : null);
}
@Override

View File

@@ -18,6 +18,8 @@ import org.springframework.stereotype.Component;
@Component
public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
private static final String TYPE = "Alink";
@Data
@NoArgsConstructor
@AllArgsConstructor
@@ -62,6 +64,11 @@ public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
}
@Override
public String type() {
return TYPE;
}
@Override
public byte[] encode(IotDeviceMessage message) {
AlinkMessage alinkMessage = new AlinkMessage(message.getRequestId(), AlinkMessage.VERSION_1,
@@ -79,9 +86,4 @@ public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
alinkMessage.getData(), alinkMessage.getCode(), alinkMessage.getMsg());
}
@Override
public String type() {
return "Alink";
}
}

View File

@@ -11,17 +11,29 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
// TODO @haohao【重要】是不是二进制更彻底哈
// 包头(4 字节)
// 消息 ID stringnvarcharlength + string
// version可选不要干脆
// method stringnvarchar为什么不要 opcode因为 IotTcpJsonDeviceMessageCodec 里面,实际已经没 opcode 了
// reply bit0 请求1 响应
// 请求时:
// paramsnvarcharjson 处理
// 响应时:
// code
// msg nvarchar
// datanvarcharjson 处理
/**
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器
*
* 使用自定义二进制协议格式:
* 包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
* 使用自定义二进制协议格式:包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
*
* @author 芋道源码
*/
@Component
public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
// TODO @haohao是不是叫 TCP_Binary 好点哈?
public static final String TYPE = "TCP_BINARY";
@Data
@@ -34,11 +46,13 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private Short code;
// TODO @haohao这个和 AlinkMessage 里面,是一个东西哇?
/**
* 消息序号
*/
private Short mid;
// TODO @haohao这个字段是不是没用到呀感觉应该也不在消息列哈
/**
* 设备 ID
*/
@@ -59,6 +73,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private Object data;
// TODO @haohao这个可以改成 code 哇?更好理解一点;
/**
* 响应错误码
*/
@@ -69,6 +84,8 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private String msg;
// TODO @haohaoTcpBinaryMessage 和 TcpJsonMessage 保持一致哈?
}
@Override
@@ -83,13 +100,14 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
try {
// 1. 确定功能码
short code = MessageMethod.STATE_ONLINE.equals(message.getMethod()) ? TcpDataPackage.CODE_HEARTBEAT
: TcpDataPackage.CODE_MESSAGE_UP;
short code = MessageMethod.STATE_ONLINE.equals(message.getMethod())
? TcpDataPackage.CODE_HEARTBEAT : TcpDataPackage.CODE_MESSAGE_UP;
// 2. 构建负载数据
String payload = buildPayload(message);
// 3. 构建 TCP 数据包
// TODO @haohao这个和 AlinkMessage.id 是不是一致的哈?
short mid = (short) (System.currentTimeMillis() % Short.MAX_VALUE);
TcpDataPackage dataPackage = new TcpDataPackage(code, mid, payload);
@@ -101,7 +119,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
}
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
Assert.isTrue(bytes.length > 0, "待解码数据不能为空");
@@ -188,21 +205,20 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
Assert.notNull(dataPackage, "数据包对象不能为空");
Assert.notNull(dataPackage.getPayload(), "负载不能为空");
Buffer buffer = Buffer.buffer();
// 1. 计算包体长度(除了包头 4 字节)
int payloadLength = dataPackage.getPayload().getBytes().length;
int totalLength = 2 + 2 + payloadLength;
// 2. 写入包头总长度4 字节)
// 2. 写入数据
Buffer buffer = Buffer.buffer();
// 2.1 写入包头总长度4 字节)
buffer.appendInt(totalLength);
// 3. 写入功能码2 字节)
// 2.2 写入功能码2 字节)
buffer.appendShort(dataPackage.getCode());
// 4. 写入消息序号2 字节)
// 2.3 写入消息序号2 字节)
buffer.appendShort(dataPackage.getMid());
// 5. 写入包体数据(不定长)
// 2.4 写入包体数据(不定长)
buffer.appendBytes(dataPackage.getPayload().getBytes());
return buffer;
}
@@ -216,18 +232,14 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
Assert.isTrue(buffer.length() >= 8, "数据包长度不足");
int index = 0;
// 1. 跳过包头4 字节)
index += 4;
// 2. 获取功能码2 字节)
short code = buffer.getShort(index);
index += 2;
// 3. 获取消息序号2 字节)
short mid = buffer.getShort(index);
index += 2;
// 4. 获取包体数据
String payload = "";
if (index < buffer.length()) {
@@ -239,14 +251,17 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
// ==================== 内部类 ====================
// TODO @haohao会不会存在 reply 的时候,有 data、msg、code 参数哈。
/**
* 负载信息类
*/
@Data
@AllArgsConstructor
private static class PayloadInfo {
private String requestId;
private Object params;
}
/**
@@ -255,6 +270,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
@Data
@AllArgsConstructor
private static class TcpDataPackage {
// 功能码定义
public static final short CODE_REGISTER = 10;
public static final short CODE_REGISTER_REPLY = 11;
@@ -263,9 +279,11 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
public static final short CODE_MESSAGE_UP = 30;
public static final short CODE_MESSAGE_DOWN = 40;
// TODO @haohao要不改成 opCode
private short code;
private short mid;
private String payload;
}
// ==================== 常量定义 ====================
@@ -274,12 +292,15 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
* 消息方法常量
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
}
// ==================== 自定义异常 ====================
// TODO @haohao全局异常搞个。看着可以服用哈。
/**
* TCP 编解码异常
*/
@@ -288,4 +309,5 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
super(message, cause);
}
}
}

View File

@@ -12,15 +12,14 @@ import org.springframework.stereotype.Component;
/**
* TCP JSON 格式 {@link IotDeviceMessage} 编解码器
*
* 采用纯 JSON 格式传输
*
* JSON 消息格式:
* 采用纯 JSON 格式传输,格式如下:
* {
* "id": "消息 ID",
* "method": "消息方法",
* "deviceId": "设备 ID",
* "params": {...},
* "timestamp": 时间戳
* "id": "消息 ID",
* "method": "消息方法",
* "deviceId": "设备 ID",
* "params": {...},
* "timestamp": 时间戳
* // TODO @haohao貌似少了 code、msg、timestamp
* }
*
* @author 芋道源码
@@ -45,6 +44,7 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private String method;
// TODO @haohao这个字段是不是没用到呀感觉应该也不在消息列哈
/**
* 设备 ID
*/
@@ -84,14 +84,9 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
@Override
public byte[] encode(IotDeviceMessage message) {
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(
message.getRequestId(),
message.getMethod(),
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(message.getRequestId(), message.getMethod(),
message.getDeviceId(),
message.getParams(),
message.getData(),
message.getCode(),
message.getMsg(),
message.getParams(), message.getData(), message.getCode(), message.getMsg(),
System.currentTimeMillis());
return JsonUtils.toJsonByte(tcpJsonMessage);
}
@@ -102,13 +97,9 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(bytes, TcpJsonMessage.class);
Assert.notNull(tcpJsonMessage, "消息不能为空");
Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空");
IotDeviceMessage iotDeviceMessage = IotDeviceMessage.of(
tcpJsonMessage.getId(),
tcpJsonMessage.getMethod(),
tcpJsonMessage.getParams(),
tcpJsonMessage.getData(),
tcpJsonMessage.getCode(),
tcpJsonMessage.getMsg());
// TODO @haohao这个我已经改了哈。一些属性可以放在一行好理解一点~
IotDeviceMessage iotDeviceMessage = IotDeviceMessage.of(tcpJsonMessage.getId(), tcpJsonMessage.getMethod(),
tcpJsonMessage.getParams(), tcpJsonMessage.getData(), tcpJsonMessage.getCode(), tcpJsonMessage.getMsg());
iotDeviceMessage.setDeviceId(tcpJsonMessage.getDeviceId());
return iotDeviceMessage;
}

View File

@@ -27,10 +27,12 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
private final IotTcpUpstreamProtocol protocol;
// todo @haohao不用的变量可以去掉哈
private final IotDeviceService deviceService;
private final IotTcpSessionManager sessionManager;
// TODO @haohaolombok 简化
public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
IotDeviceMessageService messageService,
IotDeviceService deviceService,

View File

@@ -24,6 +24,7 @@ public class IotTcpAuthManager {
*/
private final Map<NetSocket, AuthInfo> authStatusMap = new ConcurrentHashMap<>();
// TODO @haohao得考虑一个设备连接多次
/**
* 设备 ID -> NetSocket 的映射(用于快速查找)
*/
@@ -37,6 +38,7 @@ public class IotTcpAuthManager {
*/
public void registerAuth(NetSocket socket, AuthInfo authInfo) {
// 如果设备已有其他连接,先清理旧连接
// TODO @haohao是不是允许同时连接就像 mqtt 应该也允许重复连接哈?
NetSocket oldSocket = deviceSocketMap.get(authInfo.getDeviceId());
if (oldSocket != null && oldSocket != socket) {
log.info("[registerAuth][设备已有其他连接,清理旧连接] 设备 ID: {}, 旧连接: {}",
@@ -66,6 +68,7 @@ public class IotTcpAuthManager {
}
}
// TODO @haohao建议暂时没用的方法可以删除掉整体聚焦
/**
* 注销设备认证信息
*
@@ -158,6 +161,7 @@ public class IotTcpAuthManager {
int count = authStatusMap.size();
authStatusMap.clear();
deviceSocketMap.clear();
// TODO @haohao第一个括号是方法第二个括号是明细日志其它日志也可以检查下哈。
log.info("[clearAll][清理所有认证信息] 清理数量: {}", count);
}
@@ -166,6 +170,7 @@ public class IotTcpAuthManager {
*/
@Data
public static class AuthInfo {
/**
* 设备编号
*/
@@ -181,6 +186,7 @@ public class IotTcpAuthManager {
*/
private String deviceName;
// TODO @haohao令牌不要存储万一有安全问题哈
/**
* 认证令牌
*/
@@ -190,5 +196,7 @@ public class IotTcpAuthManager {
* 客户端 ID
*/
private String clientId;
}
}

View File

@@ -7,6 +7,7 @@ import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// TODO @haohaoIotTcpSessionManager、IotTcpAuthManager 是不是融合哈?
/**
* IoT 网关 TCP 会话管理器
* <p>

View File

@@ -21,6 +21,7 @@ public class IotTcpDownstreamHandler {
private final IotTcpSessionManager sessionManager;
// TODO @haohao这个可以使用 lombok 简化构造方法
public IotTcpDownstreamHandler(IotDeviceMessageService messageService,
IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
this.messageService = messageService;
@@ -38,6 +39,7 @@ public class IotTcpDownstreamHandler {
log.info("[handle][处理下行消息] 设备 ID: {}, 方法: {}, 消息 ID: {}",
message.getDeviceId(), message.getMethod(), message.getId());
// TODO @haohao 1. 和 2. 可以合成 1.1 1.2 并且中间可以不空行;
// 1. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device == null) {
@@ -62,11 +64,12 @@ public class IotTcpDownstreamHandler {
} else {
log.error("[handle][下行消息发送失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
message.getDeviceId(), message.getMethod(), message.getId());
}
} // TODO @haohao下面这个空行可以考虑去掉的哈。
} catch (Exception e) {
log.error("[handle][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息内容: {}",
message.getDeviceId(), message.getMethod(), message.getParams(), e);
}
}
}

View File

@@ -8,6 +8,7 @@ import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
@@ -28,8 +29,10 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// TODO @haohao这两个变量可以复用 IotTcpBinaryDeviceMessageCodec 的 TYPE
private static final String CODEC_TYPE_JSON = "TCP_JSON";
private static final String CODEC_TYPE_BINARY = "TCP_BINARY";
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
@@ -89,6 +92,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
return;
}
// TODO @haohao2. 和 3. 可以合并成 2.1 2.2 ,都是异常的情况。然后 3. 可以 return 直接;
// 2. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(messageInfo.message.getDeviceId());
if (device == null) {
@@ -114,12 +118,13 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private void handleAuthRequest(String clientId, IotDeviceMessage message, NetSocket socket, String codecType) {
try {
// 1. 验证认证请求
// TODO @haohaoObjUtil.notEquals。减少取反
if (!AUTH_METHOD.equals(message.getMethod())) {
sendError(socket, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 2. 解析认证参数
// 2. 解析认证参数 // TODO @haohao1. 和 2. 可以合并成 1.1 1.2 都是参数校验
AuthParams authParams = parseAuthParams(message.getParams());
if (authParams == null) {
sendError(socket, message.getRequestId(), "认证参数不完整", codecType);
@@ -127,6 +132,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
// 3. 执行认证流程
// TODO @haohao成功失败、都大哥日志会不会更好哈
if (performAuthentication(authParams, socket, message.getRequestId(), codecType)) {
log.info("[handleAuthRequest][认证成功] 客户端 ID: {}, username: {}", clientId, authParams.username);
}
@@ -157,6 +163,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 解码消息
*/
// TODO @haohao是不是还是直接管理后台配置协议然后直接使用就好啦。暂时不考虑动态解析哈。保持一致降低理解成本哈。
private MessageInfo decodeMessage(Buffer buffer) {
try {
String rawData = buffer.toString();
@@ -172,6 +179,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 执行认证
*/
// TODO @haohao下面的 1. 2. 可以合并下,本质也是校验哈。
private boolean performAuthentication(AuthParams authParams, NetSocket socket, String requestId, String codecType) {
// 1. 执行认证
if (!authenticateDevice(authParams)) {
@@ -202,7 +210,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
serverId);
sendSuccess(socket, requestId, "认证成功", codecType);
return true;
}
@@ -252,8 +259,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* 判断是否为 JSON 格式
*/
private boolean isJsonFormat(String data) {
if (StrUtil.isBlank(data))
if (StrUtil.isBlank(data)) {
return false;
}
String trimmed = data.trim();
return (trimmed.startsWith("{") && trimmed.endsWith("}")) || (trimmed.startsWith("[") && trimmed.endsWith("]"));
}
@@ -262,15 +270,14 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* 解析认证参数
*/
private AuthParams parseAuthParams(Object params) {
if (params == null)
if (params == null) {
return null;
}
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
: JSONUtil.parseObj(params.toString());
String clientId = paramsJson.getStr("clientId");
String username = paramsJson.getStr("username");
String password = paramsJson.getStr("password");
return StrUtil.hasBlank(clientId, username, password) ? null : new AuthParams(clientId, username, password);
}
@@ -278,11 +285,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* 认证设备
*/
private boolean authenticateDevice(AuthParams authParams) {
CommonResult<Boolean> result = deviceApi
.authDevice(new cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO()
.setClientId(authParams.clientId)
.setUsername(authParams.username)
.setPassword(authParams.password));
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.clientId).setUsername(authParams.username).setPassword(authParams.password));
return result.isSuccess() && result.getData();
}
@@ -291,6 +295,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*/
private void registerAuthInfo(NetSocket socket, IotDeviceRespDTO device, IotDeviceAuthUtils.DeviceInfo deviceInfo,
String token, String clientId) {
// TODO @haohao可以链式调用
IotTcpAuthManager.AuthInfo auth = new IotTcpAuthManager.AuthInfo();
auth.setDeviceId(device.getId());
auth.setProductKey(deviceInfo.getProductKey());
@@ -316,6 +321,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
sendResponse(socket, true, message, requestId, codecType);
}
// TODO @haohao使用 lombok方便 jdk8 兼容
/**
* 认证参数
*/
@@ -327,4 +334,5 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*/
private record MessageInfo(IotDeviceMessage message, String codecType) {
}
}