feat:【IoT 物联网】重构 TCP 协议处理,新增 TCP 会话和认证管理

This commit is contained in:
haohao
2025-07-26 22:15:37 +08:00
parent f70f578ac5
commit c9b9fc1f31
19 changed files with 1765 additions and 1399 deletions

View File

@@ -1,108 +1,74 @@
package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import io.vertx.core.buffer.Buffer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
// TODO @haohao设备地址(变长) 是不是非必要哈?因为认证后,不需要每次都带呀。
/**
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器
*
* 使用自定义二进制协议格式:
* 包头(4 字节) | 地址长度(2 字节) | 设备地址(变长) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
* 包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 编解码器类型
*/
public static final String TYPE = "TCP_BINARY";
// TODO @haohao这个注释不太对。
// ==================== 常量定义 ====================
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class TcpBinaryMessage {
@Override
public byte[] encode(IotDeviceMessage message) {
if (message == null || StrUtil.isEmpty(message.getMethod())) {
throw new IllegalArgumentException("消息或方法不能为空");
}
/**
* 功能码
*/
private Short code;
try {
// 1. 确定功能码(只支持数据上报和心跳)
short code = MessageMethod.STATE_ONLINE.equals(message.getMethod()) ?
TcpDataPackage.CODE_HEARTBEAT : TcpDataPackage.CODE_MESSAGE_UP;
/**
* 消息序号
*/
private Short mid;
// 2. 构建简化负载
String payload = buildSimplePayload(message);
/**
* 设备 ID
*/
private Long deviceId;
// 3. 构建 TCP 数据包
String deviceAddr = message.getDeviceId() != null ? String.valueOf(message.getDeviceId()) : "default";
short mid = (short) (System.currentTimeMillis() % Short.MAX_VALUE);
TcpDataPackage dataPackage = new TcpDataPackage(deviceAddr, code, mid, payload);
/**
* 请求方法
*/
private String method;
// 4. 编码为字节流
return encodeTcpDataPackage(dataPackage).getBytes();
} catch (Exception e) {
log.error("[encode][编码失败] 方法: {}", message.getMethod(), e);
throw new TcpCodecException("TCP 消息编码失败", e);
}
}
/**
* 请求参数
*/
private Object params;
@Override
public IotDeviceMessage decode(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("待解码数据不能为空");
}
/**
* 响应结果
*/
private Object data;
try {
// 1. 解码 TCP 数据包
TcpDataPackage dataPackage = decodeTcpDataPackage(Buffer.buffer(bytes));
/**
* 响应错误码
*/
private Integer responseCode;
// 2. 根据功能码确定方法
// TODO @haohao会不会有事件上报哈。
String method = (dataPackage.getCode() == TcpDataPackage.CODE_HEARTBEAT) ?
MessageMethod.STATE_ONLINE : MessageMethod.PROPERTY_POST;
/**
* 响应提示
*/
private String msg;
// 3. 解析负载数据和请求 ID
PayloadInfo payloadInfo = parsePayloadInfo(dataPackage.getPayload());
// 4. 构建 IoT 设备消息(设置完整的必要参数)
IotDeviceMessage message = IotDeviceMessage.requestOf(
payloadInfo.getRequestId(), method, payloadInfo.getParams());
// 5. 设置设备相关信息
// TODO @haohaoserverId 不是这里解析的哈。
Long deviceId = parseDeviceId(dataPackage.getAddr());
message.setDeviceId(deviceId);
// 6. 设置 TCP 协议相关信息
// TODO @haohaoserverId 不是这里解析的哈。
message.setServerId(generateServerId(dataPackage));
// 7. 设置租户 IDTODO: 后续可以从设备信息中获取)
// TODO @haohao租户 id 不是这里解析的哈。
// message.setTenantId(getTenantIdByDeviceId(deviceId));
if (log.isDebugEnabled()) {
log.debug("[decode][解码成功] 设备ID: {}, 方法: {}, 请求ID: {}, 消息ID: {}",
deviceId, method, message.getRequestId(), message.getId());
}
return message;
} catch (Exception e) {
log.error("[decode][解码失败] 数据长度: {}", bytes.length, e);
throw new TcpCodecException("TCP 消息解码失败", e);
}
}
@Override
@@ -110,142 +76,134 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
return TYPE;
}
// TODO @haohao这种简单解析中间不用空格哈。
/**
* 构建完整负载
*/
private String buildSimplePayload(IotDeviceMessage message) {
JSONObject payload = new JSONObject();
@Override
public byte[] encode(IotDeviceMessage message) {
Assert.notNull(message, "消息不能为空");
Assert.notBlank(message.getMethod(), "消息方法不能为空");
// 核心字段
payload.set(PayloadField.METHOD, message.getMethod());
if (message.getParams() != null) {
payload.set(PayloadField.PARAMS, message.getParams());
try {
// 1. 确定功能码
short code = MessageMethod.STATE_ONLINE.equals(message.getMethod()) ? TcpDataPackage.CODE_HEARTBEAT
: TcpDataPackage.CODE_MESSAGE_UP;
// 2. 构建负载数据
String payload = buildPayload(message);
// 3. 构建 TCP 数据包
short mid = (short) (System.currentTimeMillis() % Short.MAX_VALUE);
TcpDataPackage dataPackage = new TcpDataPackage(code, mid, payload);
// 4. 编码为字节流
return encodeTcpDataPackage(dataPackage).getBytes();
} catch (Exception e) {
throw new TcpCodecException("TCP 消息编码失败", e);
}
// 标识字段
if (StrUtil.isNotEmpty(message.getRequestId())) {
payload.set(PayloadField.REQUEST_ID, message.getRequestId());
}
if (StrUtil.isNotEmpty(message.getId())) {
payload.set(PayloadField.MESSAGE_ID, message.getId());
}
// 时间戳
payload.set(PayloadField.TIMESTAMP, System.currentTimeMillis());
return payload.toString();
}
// ==================== 编解码方法 ====================
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
Assert.isTrue(bytes.length > 0, "待解码数据不能为空");
try {
// 1. 解码 TCP 数据包
TcpDataPackage dataPackage = decodeTcpDataPackage(Buffer.buffer(bytes));
// 2. 根据功能码确定方法
String method = (dataPackage.getCode() == TcpDataPackage.CODE_HEARTBEAT) ? MessageMethod.STATE_ONLINE
: MessageMethod.PROPERTY_POST;
// 3. 解析负载数据
PayloadInfo payloadInfo = parsePayloadInfo(dataPackage.getPayload());
// 4. 构建 IoT 设备消息
return IotDeviceMessage.of(
payloadInfo.getRequestId(),
method,
payloadInfo.getParams(),
null,
null,
null);
} catch (Exception e) {
throw new TcpCodecException("TCP 消息解码失败", e);
}
}
// ==================== 内部辅助方法 ====================
/**
* 解析负载信息(包含 requestId 和 params
* 构建负载数据
*
* @param message 设备消息
* @return 负载字符串
*/
private String buildPayload(IotDeviceMessage message) {
TcpBinaryMessage tcpBinaryMessage = new TcpBinaryMessage(
null, // code 在数据包中单独处理
null, // mid 在数据包中单独处理
message.getDeviceId(),
message.getMethod(),
message.getParams(),
message.getData(),
message.getCode(),
message.getMsg());
return JsonUtils.toJsonString(tcpBinaryMessage);
}
/**
* 解析负载信息
*
* @param payload 负载字符串
* @return 负载信息
*/
private PayloadInfo parsePayloadInfo(String payload) {
if (StrUtil.isEmpty(payload)) {
if (StrUtil.isBlank(payload)) {
return new PayloadInfo(null, null);
}
try {
// TODO @haohao使用 jsonUtils
JSONObject jsonObject = JSONUtil.parseObj(payload);
String requestId = jsonObject.getStr(PayloadField.REQUEST_ID);
if (StrUtil.isEmpty(requestId)) {
requestId = jsonObject.getStr(PayloadField.MESSAGE_ID);
TcpBinaryMessage tcpBinaryMessage = JsonUtils.parseObject(payload, TcpBinaryMessage.class);
if (tcpBinaryMessage != null) {
return new PayloadInfo(
StrUtil.isNotEmpty(tcpBinaryMessage.getMethod())
? tcpBinaryMessage.getMethod() + "_" + System.currentTimeMillis()
: null,
tcpBinaryMessage.getParams());
}
Object params = jsonObject.get(PayloadField.PARAMS);
return new PayloadInfo(requestId, params);
} catch (Exception e) {
log.warn("[parsePayloadInfo][解析失败,返回原始字符串] 负载: {}", payload);
return new PayloadInfo(null, payload);
// 如果解析失败,返回默认值
return new PayloadInfo("unknown_" + System.currentTimeMillis(), null);
}
return null;
}
/**
* 从设备地址解析设备ID
*
* @param deviceAddr 设备地址字符串
* @return 设备ID
*/
private Long parseDeviceId(String deviceAddr) {
if (StrUtil.isEmpty(deviceAddr)) {
log.warn("[parseDeviceId][设备地址为空返回默认ID]");
return 0L;
}
try {
// 尝试直接解析为Long
return Long.parseLong(deviceAddr);
} catch (NumberFormatException e) {
// 如果不是纯数字,可以使用哈希值或其他策略
log.warn("[parseDeviceId][设备地址不是数字格式: {},使用哈希值]", deviceAddr);
return (long) deviceAddr.hashCode();
}
}
/**
* 生成服务ID
*
* @param dataPackage TCP数据包
* @return 服务ID
*/
private String generateServerId(TcpDataPackage dataPackage) {
// 使用协议类型 + 设备地址 + 消息序号生成唯一的服务 ID
return String.format("tcp_%s_%d", dataPackage.getAddr(), dataPackage.getMid());
}
// ==================== 内部辅助方法 ====================
/**
* 编码 TCP 数据包
*
* @param dataPackage 数据包对象
* @return 编码后的字节流
* @throws IllegalArgumentException 如果数据包对象不正确
*/
private Buffer encodeTcpDataPackage(TcpDataPackage dataPackage) {
if (dataPackage == null) {
throw new IllegalArgumentException("数据包对象不能为空");
}
Assert.notNull(dataPackage, "数据包对象不能为空");
Assert.notNull(dataPackage.getPayload(), "负载不能为空");
// 验证数据包
if (dataPackage.getAddr() == null || dataPackage.getAddr().isEmpty()) {
throw new IllegalArgumentException("设备地址不能为空");
}
if (dataPackage.getPayload() == null) {
throw new IllegalArgumentException("负载不能为空");
}
Buffer buffer = Buffer.buffer();
try {
Buffer buffer = Buffer.buffer();
// 1. 计算包体长度(除了包头 4 字节)
int payloadLength = dataPackage.getPayload().getBytes().length;
int totalLength = 2 + 2 + payloadLength;
// 1. 计算包体长度(除了包头 4 字节)
int payloadLength = dataPackage.getPayload().getBytes().length;
int totalLength = 2 + dataPackage.getAddr().length() + 2 + 2 + payloadLength;
// 2. 写入包头:总长度(4 字节)
buffer.appendInt(totalLength);
// 3. 写入功能码2 字节)
buffer.appendShort(dataPackage.getCode());
// 4. 写入消息序号2 字节)
buffer.appendShort(dataPackage.getMid());
// 5. 写入包体数据(不定长)
buffer.appendBytes(dataPackage.getPayload().getBytes());
// 2.1 写入包头总长度4 字节)
buffer.appendInt(totalLength);
// 2.2 写入设备地址长度2 字节)
buffer.appendShort((short) dataPackage.getAddr().length());
// 2.3 写入设备地址(不定长)
buffer.appendBytes(dataPackage.getAddr().getBytes());
// 2.4 写入功能码2 字节)
buffer.appendShort(dataPackage.getCode());
// 2.5 写入消息序号2 字节)
buffer.appendShort(dataPackage.getMid());
// 2.6 写入包体数据(不定长)
buffer.appendBytes(dataPackage.getPayload().getBytes());
if (log.isDebugEnabled()) {
log.debug("[encodeTcpDataPackage][编码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 总长度: {}",
dataPackage.getAddr(), dataPackage.getCode(), dataPackage.getMid(), buffer.length());
}
return buffer;
} catch (Exception e) {
log.error("[encodeTcpDataPackage][编码失败] 数据包: {}", dataPackage, e);
throw new IllegalArgumentException("数据包编码失败: " + e.getMessage(), e);
}
return buffer;
}
/**
@@ -253,101 +211,49 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
*
* @param buffer 数据缓冲区
* @return 解码后的数据包
* @throws IllegalArgumentException 如果数据包格式不正确
*/
private TcpDataPackage decodeTcpDataPackage(Buffer buffer) {
if (buffer == null || buffer.length() < 8) {
throw new IllegalArgumentException("数据包长度不足");
Assert.isTrue(buffer.length() >= 8, "数据包长度不足");
int index = 0;
// 1. 跳过包头4 字节)
index += 4;
// 2. 获取功能码2 字节)
short code = buffer.getShort(index);
index += 2;
// 3. 获取消息序号2 字节)
short mid = buffer.getShort(index);
index += 2;
// 4. 获取包体数据
String payload = "";
if (index < buffer.length()) {
payload = buffer.getString(index, buffer.length());
}
try {
int index = 0;
// 1.1 跳过包头4字节
index += 4;
// 1.2 获取设备地址长度2字节
short addrLength = buffer.getShort(index);
index += 2;
// 1.3 获取设备地址
String addr = buffer.getBuffer(index, index + addrLength).toString();
index += addrLength;
// 1.4 获取功能码2字节
short code = buffer.getShort(index);
index += 2;
// 1.5 获取消息序号2字节
short mid = buffer.getShort(index);
index += 2;
// 1.6 获取包体数据
String payload = "";
if (index < buffer.length()) {
payload = buffer.getString(index, buffer.length());
}
// 2. 构建数据包对象
TcpDataPackage dataPackage = new TcpDataPackage(addr, code, mid, payload);
if (log.isDebugEnabled()) {
log.debug("[decodeTcpDataPackage][解码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 包体长度: {}",
addr, code, mid, payload.length());
}
return dataPackage;
} catch (Exception e) {
log.error("[decodeTcpDataPackage][解码失败] 数据长度: {}", buffer.length(), e);
throw new IllegalArgumentException("数据包解码失败: " + e.getMessage(), e);
}
return new TcpDataPackage(code, mid, payload);
}
/**
* 消息方法常量
*/
public static class MessageMethod {
// ==================== 内部类 ====================
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
}
/**
* 负载字段名
*/
private static class PayloadField {
public static final String METHOD = "method";
public static final String PARAMS = "params";
public static final String TIMESTAMP = "timestamp";
public static final String REQUEST_ID = "requestId";
public static final String MESSAGE_ID = "msgId";
}
// ==================== TCP 数据包编解码方法 ====================
// TODO @haohaolombok 简化
/**
* 负载信息类
*/
@Data
@AllArgsConstructor
private static class PayloadInfo {
private String requestId;
private Object params;
public PayloadInfo(String requestId, Object params) {
this.requestId = requestId;
this.params = params;
}
public String getRequestId() { return requestId; }
public Object getParams() { return params; }
}
/**
* TCP 数据包内部类
*/
@Data
@AllArgsConstructor
private static class TcpDataPackage {
// 功能码定义
public static final short CODE_REGISTER = 10;
@@ -357,35 +263,29 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
public static final short CODE_MESSAGE_UP = 30;
public static final short CODE_MESSAGE_DOWN = 40;
private String addr;
private short code;
private short mid;
private String payload;
}
public TcpDataPackage(String addr, short code, short mid, String payload) {
this.addr = addr;
this.code = code;
this.mid = mid;
this.payload = payload;
}
// ==================== 常量定义 ====================
/**
* 消息方法常量
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
}
// ==================== 自定义异常 ====================
// TODO @haohao可以搞个全局的
/**
* TCP 编解码异常
*/
public static class TcpCodecException extends RuntimeException {
// TODO @haohao非必要构造方法可以去掉哈。
public TcpCodecException(String message) {
super(message);
}
public TcpCodecException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@@ -1,143 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* TCP编解码器管理器简化版
*
* 核心功能:
* - 自动协议检测(二进制 vs JSON
* - 统一编解码接口
* - 默认使用 JSON 协议
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpCodecManager implements IotDeviceMessageCodec {
public static final String TYPE = "TCP";
// TODO @haohao@Resource
@Autowired
private IotTcpBinaryDeviceMessageCodec binaryCodec;
@Autowired
private IotTcpJsonDeviceMessageCodec jsonCodec;
/**
* 当前默认协议JSON
*/
private boolean useJsonByDefault = true;
@Override
public String type() {
return TYPE;
}
@Override
public byte[] encode(IotDeviceMessage message) {
// 默认使用 JSON 协议编码
return jsonCodec.encode(message);
}
// TODO @haohao要不还是不自动检测用户手动配置哈。简化一些。。。
@Override
public IotDeviceMessage decode(byte[] bytes) {
// 自动检测协议类型并解码
if (isJsonFormat(bytes)) {
if (log.isDebugEnabled()) {
log.debug("[decode][检测到 JSON 协议,数据长度: {} 字节]", bytes.length);
}
return jsonCodec.decode(bytes);
} else {
if (log.isDebugEnabled()) {
log.debug("[decode][检测到二进制协议,数据长度: {} 字节]", bytes.length);
}
return binaryCodec.decode(bytes);
}
}
// ==================== 便捷方法 ====================
/**
* 使用 JSON 协议编码
*/
public byte[] encodeJson(IotDeviceMessage message) {
return jsonCodec.encode(message);
}
/**
* 使用二进制协议编码
*/
public byte[] encodeBinary(IotDeviceMessage message) {
return binaryCodec.encode(message);
}
/**
* 获取当前默认协议
*/
public String getDefaultProtocol() {
return useJsonByDefault ? "JSON" : "BINARY";
}
/**
* 设置默认协议
*/
public void setDefaultProtocol(boolean useJson) {
this.useJsonByDefault = useJson;
log.info("[setDefaultProtocol][设置默认协议] 使用JSON: {}", useJson);
}
// ==================== 内部方法 ====================
/**
* 检测是否为JSON格式
*
* 检测规则:
* 1. 数据以 '{' 开头
* 2. 包含 "method" 或 "id" 字段
*/
private boolean isJsonFormat(byte[] bytes) {
// TODO @haohaoArrayUtil.isEmpty(bytes) 可以简化下
if (bytes == null || bytes.length == 0) {
return useJsonByDefault;
}
try {
// 检测 JSON 格式:以 '{' 开头
if (bytes[0] == '{') {
// TODO @haohao不一定按照顺序写这个可能要看下。
// 进一步验证是否为有效 JSON
String jsonStr = new String(bytes, 0, Math.min(bytes.length, 100));
return jsonStr.contains("\"method\"") || jsonStr.contains("\"id\"");
}
// 检测二进制格式:长度 >= 8 且符合二进制协议结构
if (bytes.length >= 8) {
// 读取包头(前 4 字节表示后续数据长度)
int expectedLength = ((bytes[0] & 0xFF) << 24) |
((bytes[1] & 0xFF) << 16) |
((bytes[2] & 0xFF) << 8) |
(bytes[3] & 0xFF);
// 验证长度是否合理
// TODO @haohaoexpectedLength > 0 多余的貌似;
if (expectedLength == bytes.length - 4 && expectedLength > 0 && expectedLength < 1024 * 1024) {
return false; // 二进制格式
}
}
} catch (Exception e) {
log.warn("[isJsonFormat][协议检测异常,使用默认协议: {}]", getDefaultProtocol(), e);
}
// 默认使用当前设置的协议类型
return useJsonByDefault;
}
}

View File

@@ -1,42 +1,81 @@
package cn.iocoder.yudao.module.iot.gateway.codec.tcp;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import lombok.extern.slf4j.Slf4j;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* TCP JSON 格式 {@link IotDeviceMessage} 编解码器
*
* 采用纯 JSON 格式传输,参考 EMQX 和 HTTP 模块的数据格式
* 采用纯 JSON 格式传输
*
* JSON消息格式
* JSON 消息格式:
* {
* "id": "消息 ID",
* "method": "消息方法",
* "deviceId": "设备 ID",
* "productKey": "产品 Key",
* "deviceName": "设备名称",
* "params": {...},
* "timestamp": 时间戳
* "id": "消息 ID",
* "method": "消息方法",
* "deviceId": "设备 ID",
* "params": {...},
* "timestamp": 时间戳
* }
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String TYPE = "TCP_JSON";
// TODO @haohao变量不太对
// ==================== 常量定义 ====================
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class TcpJsonMessage {
/**
* 消息 ID且每个消息 ID 在当前设备具有唯一性
*/
private String id;
/**
* 请求方法
*/
private String method;
/**
* 设备 ID
*/
private Long deviceId;
/**
* 请求参数
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 响应提示
*/
private String msg;
/**
* 时间戳
*/
private Long timestamp;
}
@Override
public String type() {
@@ -45,208 +84,33 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
@Override
public byte[] encode(IotDeviceMessage message) {
if (message == null || StrUtil.isEmpty(message.getMethod())) {
throw new IllegalArgumentException("消息或方法不能为空");
}
try {
// 构建JSON消息
JSONObject jsonMessage = buildJsonMessage(message);
// 转换为字节数组
String jsonString = jsonMessage.toString();
byte[] result = jsonString.getBytes(StandardCharsets.UTF_8);
if (log.isDebugEnabled()) {
log.debug("[encode][编码成功] 方法: {}, JSON长度: {}字节, 内容: {}",
message.getMethod(), result.length, jsonString);
}
return result;
} catch (Exception e) {
log.error("[encode][编码失败] 方法: {}", message.getMethod(), e);
throw new RuntimeException("JSON消息编码失败", e);
}
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(
message.getRequestId(),
message.getMethod(),
message.getDeviceId(),
message.getParams(),
message.getData(),
message.getCode(),
message.getMsg(),
System.currentTimeMillis());
return JsonUtils.toJsonByte(tcpJsonMessage);
}
// ==================== 编解码方法 ====================
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("待解码数据不能为空");
}
try {
// 转换为 JSON 字符串
String jsonString = new String(bytes, StandardCharsets.UTF_8);
if (log.isDebugEnabled()) {
log.debug("[decode][开始解码] JSON长度: {}字节, 内容: {}", bytes.length, jsonString);
}
// 解析 JSON 消息
// TODO @haohaoJsonUtils
JSONObject jsonMessage = JSONUtil.parseObj(jsonString);
// 构建IoT设备消息
IotDeviceMessage message = parseJsonMessage(jsonMessage);
if (log.isDebugEnabled()) {
log.debug("[decode][解码成功] 消息ID: {}, 方法: {}, 设备ID: {}",
message.getId(), message.getMethod(), message.getDeviceId());
}
return message;
} catch (Exception e) {
log.error("[decode][解码失败] 数据长度: {}", bytes.length, e);
throw new RuntimeException("JSON消息解码失败", e);
}
TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(bytes, TcpJsonMessage.class);
Assert.notNull(tcpJsonMessage, "消息不能为空");
Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空");
IotDeviceMessage iotDeviceMessage = IotDeviceMessage.of(
tcpJsonMessage.getId(),
tcpJsonMessage.getMethod(),
tcpJsonMessage.getParams(),
tcpJsonMessage.getData(),
tcpJsonMessage.getCode(),
tcpJsonMessage.getMsg());
iotDeviceMessage.setDeviceId(tcpJsonMessage.getDeviceId());
return iotDeviceMessage;
}
/**
* 编码数据上报消息
*/
public byte[] encodeDataReport(Object params, Long deviceId, String productKey, String deviceName) {
IotDeviceMessage message = createMessage(MessageMethod.PROPERTY_POST, params, deviceId, productKey, deviceName);
return encode(message);
}
/**
* 编码心跳消息
*/
public byte[] encodeHeartbeat(Long deviceId, String productKey, String deviceName) {
IotDeviceMessage message = createMessage(MessageMethod.STATE_ONLINE, null, deviceId, productKey, deviceName);
return encode(message);
}
// ==================== 便捷方法 ====================
/**
* 编码事件上报消息
*/
public byte[] encodeEventReport(Object params, Long deviceId, String productKey, String deviceName) {
IotDeviceMessage message = createMessage(MessageMethod.EVENT_POST, params, deviceId, productKey, deviceName);
return encode(message);
}
/**
* 构建 JSON 消息
*/
private JSONObject buildJsonMessage(IotDeviceMessage message) {
JSONObject jsonMessage = new JSONObject();
// 基础字段
jsonMessage.set(JsonField.ID, StrUtil.isNotEmpty(message.getId()) ? message.getId() : IdUtil.fastSimpleUUID());
jsonMessage.set(JsonField.METHOD, message.getMethod());
jsonMessage.set(JsonField.TIMESTAMP, System.currentTimeMillis());
// 设备信息
if (message.getDeviceId() != null) {
jsonMessage.set(JsonField.DEVICE_ID, message.getDeviceId());
}
// 参数
if (message.getParams() != null) {
jsonMessage.set(JsonField.PARAMS, message.getParams());
}
// 响应码和消息(用于下行消息)
if (message.getCode() != null) {
jsonMessage.set(JsonField.CODE, message.getCode());
}
if (StrUtil.isNotEmpty(message.getMsg())) {
jsonMessage.set(JsonField.MESSAGE, message.getMsg());
}
return jsonMessage;
}
/**
* 解析JSON消息
*/
private IotDeviceMessage parseJsonMessage(JSONObject jsonMessage) {
// 提取基础字段
String id = jsonMessage.getStr(JsonField.ID);
String method = jsonMessage.getStr(JsonField.METHOD);
Object params = jsonMessage.get(JsonField.PARAMS);
// 创建消息对象
IotDeviceMessage message = IotDeviceMessage.requestOf(id, method, params);
// 设置设备信息
Long deviceId = jsonMessage.getLong(JsonField.DEVICE_ID);
if (deviceId != null) {
message.setDeviceId(deviceId);
}
// 设置响应信息
Integer code = jsonMessage.getInt(JsonField.CODE);
if (code != null) {
message.setCode(code);
}
String msg = jsonMessage.getStr(JsonField.MESSAGE);
if (StrUtil.isNotEmpty(msg)) {
message.setMsg(msg);
}
// 设置服务 ID基于 JSON 格式)
message.setServerId(generateServerId(jsonMessage));
return message;
}
// ==================== 内部辅助方法 ====================
/**
* 创建消息对象
*/
private IotDeviceMessage createMessage(String method, Object params, Long deviceId, String productKey, String deviceName) {
IotDeviceMessage message = IotDeviceMessage.requestOf(method, params);
message.setDeviceId(deviceId);
return message;
}
/**
* 生成服务ID
*/
private String generateServerId(JSONObject jsonMessage) {
String id = jsonMessage.getStr(JsonField.ID);
Long deviceId = jsonMessage.getLong(JsonField.DEVICE_ID);
return String.format("tcp_json_%s_%s", deviceId != null ? deviceId : "unknown",
StrUtil.isNotEmpty(id) ? id.substring(0, Math.min(8, id.length())) : "noId");
}
// TODO @haohao注释格式不对
/**
* 消息方法常量
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
public static final String EVENT_POST = "thing.event.post"; // 事件上报
public static final String PROPERTY_SET = "thing.property.set"; // 属性设置
public static final String PROPERTY_GET = "thing.property.get"; // 属性获取
public static final String SERVICE_INVOKE = "thing.service.invoke"; // 服务调用
}
/**
* JSON字段名参考EMQX和HTTP模块格式
*/
private static class JsonField {
public static final String ID = "id";
public static final String METHOD = "method";
public static final String DEVICE_ID = "deviceId";
public static final String PRODUCT_KEY = "productKey";
public static final String DEVICE_NAME = "deviceName";
public static final String PARAMS = "params";
public static final String TIMESTAMP = "timestamp";
public static final String CODE = "code";
public static final String MESSAGE = "message";
}
}

View File

@@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpCodecManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
@@ -10,6 +8,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -93,18 +92,20 @@ public class IotGatewayConfiguration {
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotDeviceCommonApi deviceApi,
IotTcpCodecManager codecManager,
IotTcpSessionManager sessionManager,
Vertx tcpVertx) {
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
deviceService, messageService, deviceApi, codecManager, tcpVertx);
deviceService, messageService, sessionManager, tcpVertx);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotTcpSessionManager sessionManager,
IotMessageBus messageBus) {
return new IotTcpDownstreamSubscriber(protocolHandler, messageService, messageBus);
return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
messageBus);
}
}

View File

@@ -4,10 +4,13 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* IoT 网关 TCP 下游订阅者:接收下行给设备的消息
@@ -15,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotTcpDownstreamHandler downstreamHandler;
@@ -23,17 +27,27 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
private final IotTcpUpstreamProtocol protocol;
private final IotDeviceService deviceService;
private final IotTcpSessionManager sessionManager;
public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotTcpSessionManager sessionManager,
IotMessageBus messageBus) {
this.protocol = protocol;
this.messageBus = messageBus;
this.downstreamHandler = new IotTcpDownstreamHandler(messageService);
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.downstreamHandler = new IotTcpDownstreamHandler(messageService, deviceService, sessionManager);
}
@PostConstruct
public void init() {
messageBus.register(this);
log.info("[init][TCP 下游订阅者初始化完成] 服务器 ID: {}, Topic: {}",
protocol.getServerId(), getTopic());
}
@Override
@@ -49,22 +63,11 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 处理下行消息
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
log.error("[onMessage][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
}
}

View File

@@ -1,9 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpCodecManager;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
@@ -30,10 +29,7 @@ public class IotTcpUpstreamProtocol {
private final IotDeviceMessageService messageService;
// TODO @haohao不用的变量可以删除
private final IotDeviceCommonApi deviceApi;
private final IotTcpCodecManager codecManager;
private final IotTcpSessionManager sessionManager;
private final Vertx vertx;
@@ -45,28 +41,24 @@ public class IotTcpUpstreamProtocol {
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotDeviceCommonApi deviceApi,
IotTcpCodecManager codecManager,
IotTcpSessionManager sessionManager,
Vertx vertx) {
this.tcpProperties = tcpProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.deviceApi = deviceApi;
this.codecManager = codecManager;
this.sessionManager = sessionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort());
}
@PostConstruct
public void start() {
// TODO @haohao类似下面 62 到 75 是处理 options 的,因为中间写了注释,其实可以不用空行;然后 77 到 91 可以中间空喊去掉,更紧凑一点;
// 创建服务器选项
NetServerOptions options = new NetServerOptions()
.setPort(tcpProperties.getPort())
.setTcpKeepAlive(true)
.setTcpNoDelay(true)
.setReuseAddress(true);
// 配置 SSL如果启用
if (Boolean.TRUE.equals(tcpProperties.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
@@ -78,7 +70,8 @@ public class IotTcpUpstreamProtocol {
// 创建服务器并设置连接处理器
netServer = vertx.createNetServer(options);
netServer.connectHandler(socket -> {
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, codecManager);
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
sessionManager);
handler.handle(socket);
});

View File

@@ -0,0 +1,194 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
import io.vertx.core.net.NetSocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 TCP 认证信息管理器
* <p>
* 维护 TCP 连接的认证状态,支持认证信息的存储、查询和清理
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpAuthManager {
/**
* 连接认证状态映射NetSocket -> 认证信息
*/
private final Map<NetSocket, AuthInfo> authStatusMap = new ConcurrentHashMap<>();
/**
* 设备 ID -> NetSocket 的映射(用于快速查找)
*/
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
/**
* 注册认证信息
*
* @param socket TCP 连接
* @param authInfo 认证信息
*/
public void registerAuth(NetSocket socket, AuthInfo authInfo) {
// 如果设备已有其他连接,先清理旧连接
NetSocket oldSocket = deviceSocketMap.get(authInfo.getDeviceId());
if (oldSocket != null && oldSocket != socket) {
log.info("[registerAuth][设备已有其他连接,清理旧连接] 设备 ID: {}, 旧连接: {}",
authInfo.getDeviceId(), oldSocket.remoteAddress());
authStatusMap.remove(oldSocket);
}
// 注册新认证信息
authStatusMap.put(socket, authInfo);
deviceSocketMap.put(authInfo.getDeviceId(), socket);
log.info("[registerAuth][注册认证信息] 设备 ID: {}, 连接: {}, productKey: {}, deviceName: {}",
authInfo.getDeviceId(), socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
}
/**
* 注销认证信息
*
* @param socket TCP 连接
*/
public void unregisterAuth(NetSocket socket) {
AuthInfo authInfo = authStatusMap.remove(socket);
if (authInfo != null) {
deviceSocketMap.remove(authInfo.getDeviceId());
log.info("[unregisterAuth][注销认证信息] 设备 ID: {}, 连接: {}",
authInfo.getDeviceId(), socket.remoteAddress());
}
}
/**
* 注销设备认证信息
*
* @param deviceId 设备 ID
*/
public void unregisterAuth(Long deviceId) {
NetSocket socket = deviceSocketMap.remove(deviceId);
if (socket != null) {
AuthInfo authInfo = authStatusMap.remove(socket);
if (authInfo != null) {
log.info("[unregisterAuth][注销设备认证信息] 设备 ID: {}, 连接: {}",
deviceId, socket.remoteAddress());
}
}
}
/**
* 获取认证信息
*
* @param socket TCP 连接
* @return 认证信息,如果未认证则返回 null
*/
public AuthInfo getAuthInfo(NetSocket socket) {
return authStatusMap.get(socket);
}
/**
* 获取设备的认证信息
*
* @param deviceId 设备 ID
* @return 认证信息,如果设备未认证则返回 null
*/
public AuthInfo getAuthInfo(Long deviceId) {
NetSocket socket = deviceSocketMap.get(deviceId);
return socket != null ? authStatusMap.get(socket) : null;
}
/**
* 检查连接是否已认证
*
* @param socket TCP 连接
* @return 是否已认证
*/
public boolean isAuthenticated(NetSocket socket) {
return authStatusMap.containsKey(socket);
}
/**
* 检查设备是否已认证
*
* @param deviceId 设备 ID
* @return 是否已认证
*/
public boolean isAuthenticated(Long deviceId) {
return deviceSocketMap.containsKey(deviceId);
}
/**
* 获取设备的 TCP 连接
*
* @param deviceId 设备 ID
* @return TCP 连接,如果设备未认证则返回 null
*/
public NetSocket getDeviceSocket(Long deviceId) {
return deviceSocketMap.get(deviceId);
}
/**
* 获取当前已认证设备数量
*
* @return 已认证设备数量
*/
public int getAuthenticatedDeviceCount() {
return deviceSocketMap.size();
}
/**
* 获取所有已认证设备 ID
*
* @return 已认证设备 ID 集合
*/
public java.util.Set<Long> getAuthenticatedDeviceIds() {
return deviceSocketMap.keySet();
}
/**
* 清理所有认证信息
*/
public void clearAll() {
int count = authStatusMap.size();
authStatusMap.clear();
deviceSocketMap.clear();
log.info("[clearAll][清理所有认证信息] 清理数量: {}", count);
}
/**
* 认证信息
*/
@Data
public static class AuthInfo {
/**
* 设备编号
*/
private Long deviceId;
/**
* 产品标识
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 认证令牌
*/
private String token;
/**
* 客户端 ID
*/
private String clientId;
}
}

View File

@@ -0,0 +1,143 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 网关 TCP 会话管理器
* <p>
* 维护设备 ID 和 TCP 连接的映射关系,支持下行消息发送
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotTcpSessionManager {
/**
* 设备 ID -> TCP 连接的映射
*/
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
/**
* TCP 连接 -> 设备 ID 的映射(用于连接断开时清理)
*/
private final Map<NetSocket, Long> socketDeviceMap = new ConcurrentHashMap<>();
/**
* 注册设备会话
*
* @param deviceId 设备 ID
* @param socket TCP 连接
*/
public void registerSession(Long deviceId, NetSocket socket) {
// 如果设备已有连接,先断开旧连接
NetSocket oldSocket = deviceSocketMap.get(deviceId);
if (oldSocket != null && oldSocket != socket) {
log.info("[registerSession][设备已有连接,断开旧连接] 设备 ID: {}, 旧连接: {}", deviceId, oldSocket.remoteAddress());
oldSocket.close();
socketDeviceMap.remove(oldSocket);
}
// 注册新连接
deviceSocketMap.put(deviceId, socket);
socketDeviceMap.put(socket, deviceId);
log.info("[registerSession][注册设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
}
/**
* 注销设备会话
*
* @param deviceId 设备 ID
*/
public void unregisterSession(Long deviceId) {
NetSocket socket = deviceSocketMap.remove(deviceId);
if (socket != null) {
socketDeviceMap.remove(socket);
log.info("[unregisterSession][注销设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
}
}
/**
* 注销 TCP 连接会话
*
* @param socket TCP 连接
*/
public void unregisterSession(NetSocket socket) {
Long deviceId = socketDeviceMap.remove(socket);
if (deviceId != null) {
deviceSocketMap.remove(deviceId);
log.info("[unregisterSession][注销连接会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
}
}
/**
* 获取设备的 TCP 连接
*
* @param deviceId 设备 ID
* @return TCP 连接,如果设备未连接则返回 null
*/
public NetSocket getDeviceSocket(Long deviceId) {
return deviceSocketMap.get(deviceId);
}
/**
* 检查设备是否在线
*
* @param deviceId 设备 ID
* @return 是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
NetSocket socket = deviceSocketMap.get(deviceId);
return socket != null;
}
/**
* 发送消息到设备
*
* @param deviceId 设备 ID
* @param data 消息数据
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, byte[] data) {
NetSocket socket = deviceSocketMap.get(deviceId);
if (socket == null) {
log.warn("[sendToDevice][设备未连接] 设备 ID: {}", deviceId);
return false;
}
try {
socket.write(io.vertx.core.buffer.Buffer.buffer(data));
log.debug("[sendToDevice][发送消息成功] 设备 ID: {}, 数据长度: {} 字节", deviceId, data.length);
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息失败] 设备 ID: {}", deviceId, e);
// 发送失败时清理连接
unregisterSession(deviceId);
return false;
}
}
/**
* 获取当前在线设备数量
*
* @return 在线设备数量
*/
public int getOnlineDeviceCount() {
return deviceSocketMap.size();
}
/**
* 获取所有在线设备 ID
*
* @return 在线设备 ID 集合
*/
public java.util.Set<Long> getOnlineDeviceIds() {
return deviceSocketMap.keySet();
}
}

View File

@@ -1,20 +1,14 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 TCP 下行消息处理器
* <p>
* 负责处理从业务系统发送到设备的下行消息,包括:
* 1. 属性设置
* 2. 服务调用
* 3. 属性获取
* 4. 配置下发
* 5. OTA 升级
* <p>
* 注意:由于移除了连接管理器,此处理器主要负责消息的编码和日志记录
*
* @author 芋道源码
*/
@@ -23,12 +17,15 @@ public class IotTcpDownstreamHandler {
private final IotDeviceMessageService messageService;
// TODO @haohao代码没提交全有报错。
// private final IotTcpDeviceMessageCodec codec;
private final IotDeviceService deviceService;
public IotTcpDownstreamHandler(IotDeviceMessageService messageService) {
private final IotTcpSessionManager sessionManager;
public IotTcpDownstreamHandler(IotDeviceMessageService messageService,
IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
this.messageService = messageService;
// this.codec = new IotTcpDeviceMessageCodec();
this.deviceService = deviceService;
this.sessionManager = sessionManager;
}
/**
@@ -38,23 +35,38 @@ public class IotTcpDownstreamHandler {
*/
public void handle(IotDeviceMessage message) {
try {
log.info("[handle][处理下行消息] 设备ID: {}, 方法: {}, 消息ID: {}",
log.info("[handle][处理下行消息] 设备 ID: {}, 方法: {}, 消息 ID: {}",
message.getDeviceId(), message.getMethod(), message.getId());
// 编码消息用于日志记录和验证
byte[] encodedMessage = null;
// codec.encode(message);
log.debug("[handle][消息编码成功] 设备ID: {}, 编码后长度: {} 字节",
message.getDeviceId(), encodedMessage.length);
// 1. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device == null) {
log.error("[handle][设备不存在] 设备 ID: {}", message.getDeviceId());
return;
}
// 记录下行消息处理
log.info("[handle][下行消息处理完成] 设备ID: {}, 方法: {}, 消息内容: {}",
message.getDeviceId(), message.getMethod(), message.getParams());
// 2. 检查设备是否在线
if (!sessionManager.isDeviceOnline(message.getDeviceId())) {
log.warn("[handle][设备不在线] 设备 ID: {}", message.getDeviceId());
return;
}
// 3. 编码消息
byte[] bytes = messageService.encodeDeviceMessage(message, device.getCodecType());
// 4. 发送消息到设备
boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes);
if (success) {
log.info("[handle][下行消息发送成功] 设备 ID: {}, 方法: {}, 消息 ID: {}, 数据长度: {} 字节",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
log.error("[handle][下行消息发送失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
message.getDeviceId(), message.getMethod(), message.getId());
}
} catch (Exception e) {
log.error("[handle][处理下行消息失败] 设备ID: {}, 方法: {}, 消息内容: {}",
log.error("[handle][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息内容: {}",
message.getDeviceId(), message.getMethod(), message.getParams(), e);
}
}
}

View File

@@ -1,110 +1,330 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpCodecManager;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpAuthManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 TCP 上行消息处理器
*
* @author 芋道源码
* TCP 上行消息处理器
*/
@Slf4j
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private static final String CODEC_TYPE_JSON = "TCP_JSON";
private static final String CODEC_TYPE_BINARY = "TCP_BINARY";
private static final String AUTH_METHOD = "auth";
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotTcpSessionManager sessionManager;
private final IotTcpAuthManager authManager;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
private final IotTcpCodecManager codecManager;
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService,
IotTcpCodecManager codecManager) {
IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.sessionManager = sessionManager;
this.authManager = SpringUtil.getBean(IotTcpAuthManager.class);
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
this.codecManager = codecManager;
}
@Override
public void handle(NetSocket socket) {
// 生成客户端ID用于日志标识
String clientId = IdUtil.simpleUUID();
log.info("[handle][收到设备连接] clientId: {}, address: {}", clientId, socket.remoteAddress());
log.info("[handle][收到设备连接] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
// 设置解析
RecordParser parser = RecordParser.newFixed(1024, buffer -> {
try {
handleDataPackage(clientId, buffer);
} catch (Exception e) {
log.error("[handle][处理数据包异常] clientId: {}", clientId, e);
}
});
// 设置异常处理
// 设置异常和关闭处理
socket.exceptionHandler(ex -> {
log.error("[handle][连接异常] clientId: {}, address: {}", clientId, socket.remoteAddress(), ex);
log.error("[handle][连接异常] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress(), ex);
cleanupSession(socket);
});
socket.closeHandler(v -> {
log.info("[handle][连接关闭] clientId: {}, address: {}", clientId, socket.remoteAddress());
log.info("[handle][连接关闭] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
cleanupSession(socket);
});
// 设置数据处理器
socket.handler(parser);
socket.handler(buffer -> handleDataPackage(clientId, buffer, socket));
}
/**
* 处理数据包
*/
private void handleDataPackage(String clientId, Buffer buffer) {
private void handleDataPackage(String clientId, Buffer buffer, NetSocket socket) {
try {
// 使用编解码器管理器自动检测协议并解码消息
IotDeviceMessage message = codecManager.decode(buffer.getBytes());
log.info("[handleDataPackage][接收数据包] clientId: {}, 方法: {}, 设备ID: {}",
clientId, message.getMethod(), message.getDeviceId());
if (buffer.length() == 0) {
log.warn("[handleDataPackage][数据包为空] 客户端 ID: {}", clientId);
return;
}
// 处理上行消息
handleUpstreamMessage(clientId, message);
// 1. 解码消息
MessageInfo messageInfo = decodeMessage(buffer);
if (messageInfo == null) {
return;
}
// 2. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(messageInfo.message.getDeviceId());
if (device == null) {
sendError(socket, messageInfo.message.getRequestId(), "设备不存在", messageInfo.codecType);
return;
}
// 3. 处理消息
if (!authManager.isAuthenticated(socket)) {
handleAuthRequest(clientId, messageInfo.message, socket, messageInfo.codecType);
} else {
IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
handleBusinessMessage(clientId, messageInfo.message, authInfo);
}
} catch (Exception e) {
log.error("[handleDataPackage][处理数据包失败] clientId: {}", clientId, e);
log.error("[handleDataPackage][处理数据包失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
}
}
/**
* 处理上行消息
* 处理认证请求
*/
private void handleUpstreamMessage(String clientId, IotDeviceMessage message) {
private void handleAuthRequest(String clientId, IotDeviceMessage message, NetSocket socket, String codecType) {
try {
log.info("[handleUpstreamMessage][上行消息] clientId: {}, 方法: {}, 设备ID: {}",
clientId, message.getMethod(), message.getDeviceId());
// 1. 验证认证请求
if (!AUTH_METHOD.equals(message.getMethod())) {
sendError(socket, message.getRequestId(), "请先进行认证", codecType);
return;
}
// 解析设备信息(简化处理)
String deviceId = String.valueOf(message.getDeviceId());
String productKey = extractProductKey(deviceId);
String deviceName = deviceId;
// 2. 解析认证参数
AuthParams authParams = parseAuthParams(message.getParams());
if (authParams == null) {
sendError(socket, message.getRequestId(), "认证参数不完整", codecType);
return;
}
// 发送消息到队列
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
// 3. 执行认证流程
if (performAuthentication(authParams, socket, message.getRequestId(), codecType)) {
log.info("[handleAuthRequest][认证成功] 客户端 ID: {}, username: {}", clientId, authParams.username);
}
} catch (Exception e) {
log.error("[handleUpstreamMessage][处理上行消息失败] clientId: {}", clientId, e);
log.error("[handleAuthRequest][认证处理异常] 客户端 ID: {}", clientId, e);
sendError(socket, message.getRequestId(), "认证处理异常: " + e.getMessage(), codecType);
}
}
/**
* 从设备ID中提取产品密钥简化实现
* 处理业务消息
*/
private String extractProductKey(String deviceId) {
// 简化实现假设设备ID格式为 "productKey_deviceName"
if (deviceId != null && deviceId.contains("_")) {
return deviceId.split("_")[0];
private void handleBusinessMessage(String clientId, IotDeviceMessage message,
IotTcpAuthManager.AuthInfo authInfo) {
try {
message.setDeviceId(authInfo.getDeviceId());
message.setServerId(serverId);
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(), authInfo.getDeviceName(),
serverId);
log.info("[handleBusinessMessage][业务消息处理完成] 客户端 ID: {}, 消息 ID: {}, 设备 ID: {}, 方法: {}",
clientId, message.getId(), message.getDeviceId(), message.getMethod());
} catch (Exception e) {
log.error("[handleBusinessMessage][处理业务消息失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
}
return "default_product";
}
/**
* 解码消息
*/
private MessageInfo decodeMessage(Buffer buffer) {
try {
String rawData = buffer.toString();
String codecType = isJsonFormat(rawData) ? CODEC_TYPE_JSON : CODEC_TYPE_BINARY;
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
return message != null ? new MessageInfo(message, codecType) : null;
} catch (Exception e) {
log.debug("[decodeMessage][消息解码失败] 错误: {}", e.getMessage());
return null;
}
}
/**
* 执行认证
*/
private boolean performAuthentication(AuthParams authParams, NetSocket socket, String requestId, String codecType) {
// 1. 执行认证
if (!authenticateDevice(authParams)) {
sendError(socket, requestId, "认证失败", codecType);
return false;
}
// 2. 获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(authParams.username);
if (deviceInfo == null) {
sendError(socket, requestId, "解析设备信息失败", codecType);
return false;
}
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendError(socket, requestId, "设备不存在", codecType);
return false;
}
// 3. 注册认证信息
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
registerAuthInfo(socket, device, deviceInfo, token, authParams.clientId);
// 4. 发送上线消息和成功响应
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
serverId);
sendSuccess(socket, requestId, "认证成功", codecType);
return true;
}
/**
* 发送响应
*/
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
try {
Object responseData = buildResponseData(success, message);
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
success ? 0 : 401, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
log.debug("[sendResponse][发送响应] success: {}, message: {}, requestId: {}", success, message, requestId);
} catch (Exception e) {
log.error("[sendResponse][发送响应失败] requestId: {}", requestId, e);
}
}
/**
* 构建响应数据(不返回 token
*/
private Object buildResponseData(boolean success, String message) {
return MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
}
/**
* 清理会话
*/
private void cleanupSession(NetSocket socket) {
// 如果已认证,发送离线消息
IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
if (authInfo != null) {
// 发送离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(), authInfo.getDeviceName(),
serverId);
}
sessionManager.unregisterSession(socket);
authManager.unregisterAuth(socket);
}
/**
* 判断是否为 JSON 格式
*/
private boolean isJsonFormat(String data) {
if (StrUtil.isBlank(data))
return false;
String trimmed = data.trim();
return (trimmed.startsWith("{") && trimmed.endsWith("}")) || (trimmed.startsWith("[") && trimmed.endsWith("]"));
}
/**
* 解析认证参数
*/
private AuthParams parseAuthParams(Object params) {
if (params == null)
return null;
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
: JSONUtil.parseObj(params.toString());
String clientId = paramsJson.getStr("clientId");
String username = paramsJson.getStr("username");
String password = paramsJson.getStr("password");
return StrUtil.hasBlank(clientId, username, password) ? null : new AuthParams(clientId, username, password);
}
/**
* 认证设备
*/
private boolean authenticateDevice(AuthParams authParams) {
CommonResult<Boolean> result = deviceApi
.authDevice(new cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO()
.setClientId(authParams.clientId)
.setUsername(authParams.username)
.setPassword(authParams.password));
return result.isSuccess() && result.getData();
}
/**
* 注册认证信息
*/
private void registerAuthInfo(NetSocket socket, IotDeviceRespDTO device, IotDeviceAuthUtils.DeviceInfo deviceInfo,
String token, String clientId) {
IotTcpAuthManager.AuthInfo auth = new IotTcpAuthManager.AuthInfo();
auth.setDeviceId(device.getId());
auth.setProductKey(deviceInfo.getProductKey());
auth.setDeviceName(deviceInfo.getDeviceName());
auth.setToken(token);
auth.setClientId(clientId);
authManager.registerAuth(socket, auth);
sessionManager.registerSession(device.getId(), socket);
}
/**
* 发送错误响应
*/
private void sendError(NetSocket socket, String requestId, String errorMessage, String codecType) {
sendResponse(socket, false, errorMessage, requestId, codecType);
}
/**
* 发送成功响应(不返回 token
*/
private void sendSuccess(NetSocket socket, String requestId, String message, String codecType) {
sendResponse(socket, true, message, requestId, codecType);
}
/**
* 认证参数
*/
private record AuthParams(String clientId, String username, String password) {
}
/**
* 消息信息
*/
private record MessageInfo(IotDeviceMessage message, String codecType) {
}
}

View File

@@ -20,6 +20,16 @@ public interface IotDeviceMessageService {
byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName);
/**
* 编码消息
*
* @param message 消息
* @param codecType 编解码器类型
* @return 编码后的消息内容
*/
byte[] encodeDeviceMessage(IotDeviceMessage message,
String codecType);
/**
* 解码消息
*
@@ -31,13 +41,22 @@ public interface IotDeviceMessageService {
IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName);
/**
* 解码消息
*
* @param bytes 消息内容
* @param codecType 编解码器类型
* @return 解码后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType);
/**
* 发送消息
*
* @param message 消息
* @param message 消息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serverId 设备连接的 serverId
* @param serverId 设备连接的 serverId
*/
void sendDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName, String serverId);

View File

@@ -61,6 +61,19 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
return codec.encode(message);
}
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String codecType) {
// 1. 获取编解码器
IotDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType));
}
// 2. 编码消息
return codec.encode(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName) {
@@ -79,6 +92,18 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
return codec.decode(bytes);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType) {
// 1. 获取编解码器
IotDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType));
}
// 2. 解码消息
return codec.decode(bytes);
}
@Override
public void sendDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName, String serverId) {