feat:【IoT 物联网】新增 TCP 连接管理器,优化消息编解码逻辑
This commit is contained in:
@@ -8,86 +8,72 @@ import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
// TODO @haohao:【重要】是不是二进制更彻底哈?
|
||||
// 包头(4 字节)
|
||||
// 消息 ID string;nvarchar(length + string)
|
||||
// version(可选,不要干脆)
|
||||
// method string;nvarchar;为什么不要 opcode?因为 IotTcpJsonDeviceMessageCodec 里面,实际已经没 opcode 了
|
||||
// reply bit;0 请求,1 响应
|
||||
// 请求时:
|
||||
// params;nvarchar;json 处理
|
||||
// 响应时:
|
||||
// code
|
||||
// msg nvarchar
|
||||
// data;nvarchar;json 处理
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* TCP 二进制格式 {@link IotDeviceMessage} 编解码器
|
||||
* <p>
|
||||
* 二进制协议格式(所有数值使用大端序):
|
||||
*
|
||||
* 使用自定义二进制协议格式:包头(4 字节) | 功能码(2 字节) | 消息序号(2 字节) | 包体数据(变长)
|
||||
* <pre>
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 方法名长度(2字节) | 方法名(变长字符串) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* | 消息体数据(变长) |
|
||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||
* </pre>
|
||||
* <p>
|
||||
* 消息体格式:
|
||||
* - 请求消息:params 数据(JSON)
|
||||
* - 响应消息:code (4字节) + msg 长度(2字节) + msg 字符串 + data 数据(JSON)
|
||||
* <p>
|
||||
* 注意:deviceId 不包含在协议中,由服务器根据连接上下文自动设置
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
// TODO @haohao:是不是叫 TCP_Binary 好点哈?
|
||||
public static final String TYPE = "TCP_BINARY";
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
private static class TcpBinaryMessage {
|
||||
// ==================== 协议常量 ====================
|
||||
|
||||
/**
|
||||
* 功能码
|
||||
*/
|
||||
private Short code;
|
||||
/**
|
||||
* 协议魔术字,用于协议识别
|
||||
*/
|
||||
private static final byte MAGIC_NUMBER = (byte) 0x7E;
|
||||
|
||||
// TODO @haohao:这个和 AlinkMessage 里面,是一个东西哇?
|
||||
/**
|
||||
* 消息序号
|
||||
*/
|
||||
private Short mid;
|
||||
|
||||
// TODO @haohao:这个字段,是不是没用到呀?感觉应该也不在消息列哈?
|
||||
/**
|
||||
* 设备 ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 请求方法
|
||||
*/
|
||||
private String method;
|
||||
|
||||
/**
|
||||
* 请求参数
|
||||
*/
|
||||
private Object params;
|
||||
|
||||
/**
|
||||
* 响应结果
|
||||
*/
|
||||
private Object data;
|
||||
|
||||
// TODO @haohao:这个可以改成 code 哇?更好理解一点;
|
||||
/**
|
||||
* 响应错误码
|
||||
*/
|
||||
private Integer responseCode;
|
||||
|
||||
/**
|
||||
* 响应提示
|
||||
*/
|
||||
private String msg;
|
||||
|
||||
// TODO @haohao:TcpBinaryMessage 和 TcpJsonMessage 保持一致哈?
|
||||
/**
|
||||
* 协议版本号
|
||||
*/
|
||||
private static final byte PROTOCOL_VERSION = (byte) 0x01;
|
||||
|
||||
/**
|
||||
* 消息类型常量
|
||||
*/
|
||||
public static class MessageType {
|
||||
public static final byte REQUEST = 0x01; // 请求消息
|
||||
public static final byte RESPONSE = 0x02; // 响应消息
|
||||
}
|
||||
|
||||
/**
|
||||
* 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息标志 + 消息长度)
|
||||
*/
|
||||
private static final int HEADER_FIXED_LENGTH = 8;
|
||||
|
||||
/**
|
||||
* 最小消息长度(头部 + 消息ID长度 + 方法名长度)
|
||||
*/
|
||||
private static final int MIN_MESSAGE_LENGTH = HEADER_FIXED_LENGTH + 4;
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return TYPE;
|
||||
@@ -99,215 +85,270 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
Assert.notBlank(message.getMethod(), "消息方法不能为空");
|
||||
|
||||
try {
|
||||
// 1. 确定功能码
|
||||
short code = MessageMethod.STATE_ONLINE.equals(message.getMethod())
|
||||
? TcpDataPackage.CODE_HEARTBEAT : TcpDataPackage.CODE_MESSAGE_UP;
|
||||
// 1. 确定消息类型
|
||||
byte messageType = determineMessageType(message);
|
||||
|
||||
// 2. 构建负载数据
|
||||
String payload = buildPayload(message);
|
||||
// 2. 构建消息体
|
||||
byte[] bodyData = buildMessageBody(message, messageType);
|
||||
|
||||
// 3. 构建 TCP 数据包
|
||||
// TODO @haohao:这个和 AlinkMessage.id 是不是一致的哈?
|
||||
short mid = (short) (System.currentTimeMillis() % Short.MAX_VALUE);
|
||||
TcpDataPackage dataPackage = new TcpDataPackage(code, mid, payload);
|
||||
// 3. 构建完整消息(不包含deviceId,由连接上下文管理)
|
||||
return buildCompleteMessage(message, messageType, bodyData);
|
||||
|
||||
// 4. 编码为字节流
|
||||
return encodeTcpDataPackage(dataPackage).getBytes();
|
||||
} catch (Exception e) {
|
||||
throw new TcpCodecException("TCP 消息编码失败", e);
|
||||
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
|
||||
throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage decode(byte[] bytes) {
|
||||
Assert.notNull(bytes, "待解码数据不能为空");
|
||||
Assert.isTrue(bytes.length > 0, "待解码数据不能为空");
|
||||
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
|
||||
|
||||
try {
|
||||
// 1. 解码 TCP 数据包
|
||||
TcpDataPackage dataPackage = decodeTcpDataPackage(Buffer.buffer(bytes));
|
||||
Buffer buffer = Buffer.buffer(bytes);
|
||||
|
||||
// 2. 根据功能码确定方法
|
||||
String method = (dataPackage.getCode() == TcpDataPackage.CODE_HEARTBEAT) ? MessageMethod.STATE_ONLINE
|
||||
: MessageMethod.PROPERTY_POST;
|
||||
// 1. 解析协议头部
|
||||
ProtocolHeader header = parseProtocolHeader(buffer);
|
||||
|
||||
// 3. 解析负载数据
|
||||
PayloadInfo payloadInfo = parsePayloadInfo(dataPackage.getPayload());
|
||||
// 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
|
||||
return parseMessageContent(buffer, header);
|
||||
|
||||
// 4. 构建 IoT 设备消息
|
||||
return IotDeviceMessage.of(
|
||||
payloadInfo.getRequestId(),
|
||||
method,
|
||||
payloadInfo.getParams(),
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
} catch (Exception e) {
|
||||
throw new TcpCodecException("TCP 消息解码失败", e);
|
||||
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
|
||||
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 内部辅助方法 ====================
|
||||
// ==================== 编码相关方法 ====================
|
||||
|
||||
/**
|
||||
* 构建负载数据
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @return 负载字符串
|
||||
* 确定消息类型
|
||||
* 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息
|
||||
*/
|
||||
private String buildPayload(IotDeviceMessage message) {
|
||||
TcpBinaryMessage tcpBinaryMessage = new TcpBinaryMessage(
|
||||
null, // code 在数据包中单独处理
|
||||
null, // mid 在数据包中单独处理
|
||||
message.getDeviceId(),
|
||||
message.getMethod(),
|
||||
message.getParams(),
|
||||
message.getData(),
|
||||
message.getCode(),
|
||||
message.getMsg());
|
||||
return JsonUtils.toJsonString(tcpBinaryMessage);
|
||||
private byte determineMessageType(IotDeviceMessage message) {
|
||||
// 判断是否为响应消息:有响应码或响应消息时为响应
|
||||
if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
|
||||
return MessageType.RESPONSE;
|
||||
}
|
||||
// 默认为请求消息
|
||||
return MessageType.REQUEST;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析负载信息
|
||||
*
|
||||
* @param payload 负载字符串
|
||||
* @return 负载信息
|
||||
* 构建消息体
|
||||
*/
|
||||
private PayloadInfo parsePayloadInfo(String payload) {
|
||||
if (StrUtil.isBlank(payload)) {
|
||||
return new PayloadInfo(null, null);
|
||||
}
|
||||
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
|
||||
Buffer bodyBuffer = Buffer.buffer();
|
||||
|
||||
try {
|
||||
TcpBinaryMessage tcpBinaryMessage = JsonUtils.parseObject(payload, TcpBinaryMessage.class);
|
||||
if (tcpBinaryMessage != null) {
|
||||
return new PayloadInfo(
|
||||
StrUtil.isNotEmpty(tcpBinaryMessage.getMethod())
|
||||
? tcpBinaryMessage.getMethod() + "_" + System.currentTimeMillis()
|
||||
: null,
|
||||
tcpBinaryMessage.getParams());
|
||||
if (messageType == MessageType.RESPONSE) {
|
||||
// 响应消息:code + msg长度 + msg + data
|
||||
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
|
||||
|
||||
String msg = message.getMsg() != null ? message.getMsg() : "";
|
||||
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
|
||||
bodyBuffer.appendShort((short) msgBytes.length);
|
||||
bodyBuffer.appendBytes(msgBytes);
|
||||
|
||||
if (message.getData() != null) {
|
||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
|
||||
}
|
||||
} else {
|
||||
// 请求消息:包含 params 或 data
|
||||
Object payload = message.getParams() != null ? message.getParams() : message.getData();
|
||||
if (payload != null) {
|
||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 如果解析失败,返回默认值
|
||||
return new PayloadInfo("unknown_" + System.currentTimeMillis(), null);
|
||||
}
|
||||
return null;
|
||||
|
||||
return bodyBuffer.getBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* 编码 TCP 数据包
|
||||
*
|
||||
* @param dataPackage 数据包对象
|
||||
* @return 编码后的字节流
|
||||
* 构建完整消息
|
||||
*/
|
||||
private Buffer encodeTcpDataPackage(TcpDataPackage dataPackage) {
|
||||
Assert.notNull(dataPackage, "数据包对象不能为空");
|
||||
Assert.notNull(dataPackage.getPayload(), "负载不能为空");
|
||||
|
||||
// 1. 计算包体长度(除了包头 4 字节)
|
||||
int payloadLength = dataPackage.getPayload().getBytes().length;
|
||||
int totalLength = 2 + 2 + payloadLength;
|
||||
|
||||
// 2. 写入数据
|
||||
private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
|
||||
Buffer buffer = Buffer.buffer();
|
||||
// 2.1 写入包头:总长度(4 字节)
|
||||
buffer.appendInt(totalLength);
|
||||
// 2.2 写入功能码(2 字节)
|
||||
buffer.appendShort(dataPackage.getCode());
|
||||
// 2.3 写入消息序号(2 字节)
|
||||
buffer.appendShort(dataPackage.getMid());
|
||||
// 2.4 写入包体数据(不定长)
|
||||
buffer.appendBytes(dataPackage.getPayload().getBytes());
|
||||
return buffer;
|
||||
|
||||
// 1. 写入协议头部
|
||||
buffer.appendByte(MAGIC_NUMBER);
|
||||
buffer.appendByte(PROTOCOL_VERSION);
|
||||
buffer.appendByte(messageType);
|
||||
buffer.appendByte((byte) 0x00); // 消息标志,预留字段
|
||||
|
||||
// 2. 预留消息长度位置
|
||||
int lengthPosition = buffer.length();
|
||||
buffer.appendInt(0);
|
||||
|
||||
// 3. 写入消息ID
|
||||
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
|
||||
: generateMessageId(message.getMethod());
|
||||
byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
|
||||
buffer.appendShort((short) messageIdBytes.length);
|
||||
buffer.appendBytes(messageIdBytes);
|
||||
|
||||
// 4. 写入方法名
|
||||
byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
|
||||
buffer.appendShort((short) methodBytes.length);
|
||||
buffer.appendBytes(methodBytes);
|
||||
|
||||
// 5. 写入消息体
|
||||
buffer.appendBytes(bodyData);
|
||||
|
||||
// 6. 更新消息长度
|
||||
buffer.setInt(lengthPosition, buffer.length());
|
||||
|
||||
return buffer.getBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* 解码 TCP 数据包
|
||||
*
|
||||
* @param buffer 数据缓冲区
|
||||
* @return 解码后的数据包
|
||||
* 生成消息 ID
|
||||
*/
|
||||
private TcpDataPackage decodeTcpDataPackage(Buffer buffer) {
|
||||
Assert.isTrue(buffer.length() >= 8, "数据包长度不足");
|
||||
private String generateMessageId(String method) {
|
||||
return method + "_" + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
|
||||
}
|
||||
|
||||
// ==================== 解码相关方法 ====================
|
||||
|
||||
/**
|
||||
* 解析协议头部
|
||||
*/
|
||||
private ProtocolHeader parseProtocolHeader(Buffer buffer) {
|
||||
int index = 0;
|
||||
// 1. 跳过包头(4 字节)
|
||||
|
||||
// 1. 验证魔术字
|
||||
byte magic = buffer.getByte(index++);
|
||||
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
|
||||
|
||||
// 2. 验证版本号
|
||||
byte version = buffer.getByte(index++);
|
||||
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
|
||||
|
||||
// 3. 读取消息类型
|
||||
byte messageType = buffer.getByte(index++);
|
||||
Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
|
||||
|
||||
// 4. 读取消息标志(暂时跳过)
|
||||
byte messageFlags = buffer.getByte(index++);
|
||||
|
||||
// 5. 读取消息长度
|
||||
int messageLength = buffer.getInt(index);
|
||||
index += 4;
|
||||
// 2. 获取功能码(2 字节)
|
||||
short code = buffer.getShort(index);
|
||||
|
||||
Assert.isTrue(messageLength == buffer.length(), "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
|
||||
|
||||
return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析消息内容
|
||||
*/
|
||||
private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
|
||||
int index = header.getNextIndex();
|
||||
|
||||
// 1. 读取消息ID
|
||||
short messageIdLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
// 3. 获取消息序号(2 字节)
|
||||
short mid = buffer.getShort(index);
|
||||
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
|
||||
index += messageIdLength;
|
||||
|
||||
// 2. 读取方法名
|
||||
short methodLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
// 4. 获取包体数据
|
||||
String payload = "";
|
||||
if (index < buffer.length()) {
|
||||
payload = buffer.getString(index, buffer.length());
|
||||
String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
|
||||
index += methodLength;
|
||||
|
||||
// 3. 解析消息体
|
||||
return parseMessageBody(buffer, index, header.getMessageType(), messageId, method);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析消息体
|
||||
*/
|
||||
private IotDeviceMessage parseMessageBody(Buffer buffer, int startIndex, byte messageType,
|
||||
String messageId, String method) {
|
||||
if (startIndex >= buffer.length()) {
|
||||
// 空消息体
|
||||
return IotDeviceMessage.of(messageId, method, null, null, null, null);
|
||||
}
|
||||
|
||||
return new TcpDataPackage(code, mid, payload);
|
||||
if (messageType == MessageType.RESPONSE) {
|
||||
// 响应消息:解析 code + msg + data
|
||||
return parseResponseMessage(buffer, startIndex, messageId, method);
|
||||
} else {
|
||||
// 请求消息:解析 payload(可能是 params 或 data)
|
||||
Object payload = parseJsonData(buffer, startIndex, buffer.length());
|
||||
return IotDeviceMessage.of(messageId, method, payload, null, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析响应消息
|
||||
*/
|
||||
private IotDeviceMessage parseResponseMessage(Buffer buffer, int startIndex, String messageId, String method) {
|
||||
int index = startIndex;
|
||||
|
||||
// 1. 读取响应码
|
||||
Integer code = buffer.getInt(index);
|
||||
index += 4;
|
||||
|
||||
// 2. 读取响应消息
|
||||
short msgLength = buffer.getShort(index);
|
||||
index += 2;
|
||||
String msg = msgLength > 0 ? buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()) : null;
|
||||
index += msgLength;
|
||||
|
||||
// 3. 读取响应数据
|
||||
Object data = null;
|
||||
if (index < buffer.length()) {
|
||||
data = parseJsonData(buffer, index, buffer.length());
|
||||
}
|
||||
|
||||
return IotDeviceMessage.of(messageId, method, null, data, code, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析JSON数据
|
||||
*/
|
||||
private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
|
||||
if (startIndex >= endIndex) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
|
||||
if (StrUtil.isBlank(jsonStr)) {
|
||||
return null;
|
||||
}
|
||||
return JsonUtils.parseObject(jsonStr, Object.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
|
||||
return buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
/**
|
||||
* 验证消息类型是否有效
|
||||
*/
|
||||
private boolean isValidMessageType(byte messageType) {
|
||||
return messageType == MessageType.REQUEST || messageType == MessageType.RESPONSE;
|
||||
}
|
||||
|
||||
// ==================== 内部类 ====================
|
||||
|
||||
// TODO @haohao:会不会存在 reply 的时候,有 data、msg、code 参数哈。
|
||||
/**
|
||||
* 负载信息类
|
||||
* 协议头部信息
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class PayloadInfo {
|
||||
|
||||
private String requestId;
|
||||
private Object params;
|
||||
|
||||
private static class ProtocolHeader {
|
||||
private byte magic;
|
||||
private byte version;
|
||||
private byte messageType;
|
||||
private byte messageFlags;
|
||||
private int messageLength;
|
||||
private int nextIndex; // 指向消息内容开始位置
|
||||
}
|
||||
|
||||
/**
|
||||
* TCP 数据包内部类
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class TcpDataPackage {
|
||||
|
||||
// 功能码定义
|
||||
public static final short CODE_REGISTER = 10;
|
||||
public static final short CODE_REGISTER_REPLY = 11;
|
||||
public static final short CODE_HEARTBEAT = 20;
|
||||
public static final short CODE_HEARTBEAT_REPLY = 21;
|
||||
public static final short CODE_MESSAGE_UP = 30;
|
||||
public static final short CODE_MESSAGE_DOWN = 40;
|
||||
|
||||
// TODO @haohao:要不改成 opCode
|
||||
private short code;
|
||||
private short mid;
|
||||
private String payload;
|
||||
|
||||
}
|
||||
|
||||
// ==================== 常量定义 ====================
|
||||
|
||||
/**
|
||||
* 消息方法常量
|
||||
*/
|
||||
public static class MessageMethod {
|
||||
|
||||
public static final String PROPERTY_POST = "thing.property.post"; // 数据上报
|
||||
public static final String STATE_ONLINE = "thing.state.online"; // 心跳
|
||||
|
||||
}
|
||||
|
||||
// ==================== 自定义异常 ====================
|
||||
|
||||
// TODO @haohao:全局异常搞个。看着可以服用哈。
|
||||
/**
|
||||
* TCP 编解码异常
|
||||
*/
|
||||
public static class TcpCodecException extends RuntimeException {
|
||||
public TcpCodecException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,12 +14,13 @@ import org.springframework.stereotype.Component;
|
||||
*
|
||||
* 采用纯 JSON 格式传输,格式如下:
|
||||
* {
|
||||
* "id": "消息 ID",
|
||||
* "method": "消息方法",
|
||||
* "deviceId": "设备 ID",
|
||||
* "params": {...},
|
||||
* "timestamp": 时间戳
|
||||
* // TODO @haohao:貌似少了 code、msg、timestamp
|
||||
* "id": "消息 ID",
|
||||
* "method": "消息方法",
|
||||
* "params": {...}, // 请求参数
|
||||
* "data": {...}, // 响应结果
|
||||
* "code": 200, // 响应错误码
|
||||
* "msg": "success", // 响应提示
|
||||
* "timestamp": 时间戳
|
||||
* }
|
||||
*
|
||||
* @author 芋道源码
|
||||
@@ -44,12 +45,6 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
*/
|
||||
private String method;
|
||||
|
||||
// TODO @haohao:这个字段,是不是没用到呀?感觉应该也不在消息列哈?
|
||||
/**
|
||||
* 设备 ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 请求参数
|
||||
*/
|
||||
@@ -84,9 +79,13 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
@Override
|
||||
public byte[] encode(IotDeviceMessage message) {
|
||||
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(message.getRequestId(), message.getMethod(),
|
||||
message.getDeviceId(),
|
||||
message.getParams(), message.getData(), message.getCode(), message.getMsg(),
|
||||
TcpJsonMessage tcpJsonMessage = new TcpJsonMessage(
|
||||
message.getRequestId(),
|
||||
message.getMethod(),
|
||||
message.getParams(),
|
||||
message.getData(),
|
||||
message.getCode(),
|
||||
message.getMsg(),
|
||||
System.currentTimeMillis());
|
||||
return JsonUtils.toJsonByte(tcpJsonMessage);
|
||||
}
|
||||
@@ -97,11 +96,13 @@ public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(bytes, TcpJsonMessage.class);
|
||||
Assert.notNull(tcpJsonMessage, "消息不能为空");
|
||||
Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空");
|
||||
// TODO @haohao:这个我已经改了哈。一些属性,可以放在一行,好理解一点~
|
||||
IotDeviceMessage iotDeviceMessage = IotDeviceMessage.of(tcpJsonMessage.getId(), tcpJsonMessage.getMethod(),
|
||||
tcpJsonMessage.getParams(), tcpJsonMessage.getData(), tcpJsonMessage.getCode(), tcpJsonMessage.getMsg());
|
||||
iotDeviceMessage.setDeviceId(tcpJsonMessage.getDeviceId());
|
||||
return iotDeviceMessage;
|
||||
return IotDeviceMessage.of(
|
||||
tcpJsonMessage.getId(),
|
||||
tcpJsonMessage.getMethod(),
|
||||
tcpJsonMessage.getParams(),
|
||||
tcpJsonMessage.getData(),
|
||||
tcpJsonMessage.getCode(),
|
||||
tcpJsonMessage.getMsg());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscr
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -92,19 +92,19 @@ public class IotGatewayConfiguration {
|
||||
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotTcpSessionManager sessionManager,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
Vertx tcpVertx) {
|
||||
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
|
||||
deviceService, messageService, sessionManager, tcpVertx);
|
||||
deviceService, messageService, connectionManager, tcpVertx);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler,
|
||||
IotDeviceMessageService messageService,
|
||||
IotDeviceService deviceService,
|
||||
IotTcpSessionManager sessionManager,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
IotMessageBus messageBus) {
|
||||
return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, sessionManager,
|
||||
return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, connectionManager,
|
||||
messageBus);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,13 +4,13 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* IoT 网关 TCP 下游订阅者:接收下行给设备的消息
|
||||
@@ -18,37 +18,28 @@ import org.springframework.stereotype.Component;
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
|
||||
|
||||
private final IotTcpDownstreamHandler downstreamHandler;
|
||||
|
||||
private final IotMessageBus messageBus;
|
||||
|
||||
private final IotTcpUpstreamProtocol protocol;
|
||||
|
||||
// todo @haohao:不用的变量,可以去掉哈
|
||||
private final IotDeviceMessageService messageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotTcpSessionManager sessionManager;
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
|
||||
// TODO @haohao:lombok 简化
|
||||
public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
|
||||
IotDeviceMessageService messageService,
|
||||
IotDeviceService deviceService,
|
||||
IotTcpSessionManager sessionManager,
|
||||
IotMessageBus messageBus) {
|
||||
this.protocol = protocol;
|
||||
this.messageBus = messageBus;
|
||||
this.deviceService = deviceService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.downstreamHandler = new IotTcpDownstreamHandler(messageService, deviceService, sessionManager);
|
||||
}
|
||||
private final IotMessageBus messageBus;
|
||||
|
||||
private IotTcpDownstreamHandler downstreamHandler;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 初始化下游处理器
|
||||
this.downstreamHandler = new IotTcpDownstreamHandler(messageService, deviceService, connectionManager);
|
||||
|
||||
messageBus.register(this);
|
||||
log.info("[init][TCP 下游订阅者初始化完成] 服务器 ID: {}, Topic: {}",
|
||||
log.info("[init][TCP 下游订阅者初始化完成,服务器 ID: {},Topic: {}]",
|
||||
protocol.getServerId(), getTopic());
|
||||
}
|
||||
|
||||
@@ -68,8 +59,9 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
|
||||
try {
|
||||
downstreamHandler.handle(message);
|
||||
} catch (Exception e) {
|
||||
log.error("[onMessage][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
|
||||
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
@@ -29,24 +29,24 @@ public class IotTcpUpstreamProtocol {
|
||||
|
||||
private final IotDeviceMessageService messageService;
|
||||
|
||||
private final IotTcpSessionManager sessionManager;
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
|
||||
private final Vertx vertx;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
private NetServer netServer;
|
||||
private NetServer tcpServer;
|
||||
|
||||
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotTcpSessionManager sessionManager,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
Vertx vertx) {
|
||||
this.tcpProperties = tcpProperties;
|
||||
this.deviceService = deviceService;
|
||||
this.messageService = messageService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.connectionManager = connectionManager;
|
||||
this.vertx = vertx;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort());
|
||||
}
|
||||
@@ -68,16 +68,16 @@ public class IotTcpUpstreamProtocol {
|
||||
}
|
||||
|
||||
// 创建服务器并设置连接处理器
|
||||
netServer = vertx.createNetServer(options);
|
||||
netServer.connectHandler(socket -> {
|
||||
tcpServer = vertx.createNetServer(options);
|
||||
tcpServer.connectHandler(socket -> {
|
||||
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
|
||||
sessionManager);
|
||||
connectionManager);
|
||||
handler.handle(socket);
|
||||
});
|
||||
|
||||
// 启动服务器
|
||||
try {
|
||||
netServer.listen().result();
|
||||
tcpServer.listen().result();
|
||||
log.info("[start][IoT 网关 TCP 协议启动成功,端口:{}]", tcpProperties.getPort());
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT 网关 TCP 协议启动失败]", e);
|
||||
@@ -87,9 +87,9 @@ public class IotTcpUpstreamProtocol {
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (netServer != null) {
|
||||
if (tcpServer != null) {
|
||||
try {
|
||||
netServer.close().result();
|
||||
tcpServer.close().result();
|
||||
log.info("[stop][IoT 网关 TCP 协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT 网关 TCP 协议停止失败]", e);
|
||||
|
||||
@@ -1,202 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
|
||||
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* IoT 网关 TCP 认证信息管理器
|
||||
* <p>
|
||||
* 维护 TCP 连接的认证状态,支持认证信息的存储、查询和清理
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotTcpAuthManager {
|
||||
|
||||
/**
|
||||
* 连接认证状态映射:NetSocket -> 认证信息
|
||||
*/
|
||||
private final Map<NetSocket, AuthInfo> authStatusMap = new ConcurrentHashMap<>();
|
||||
|
||||
// TODO @haohao:得考虑,一个设备连接多次?
|
||||
/**
|
||||
* 设备 ID -> NetSocket 的映射(用于快速查找)
|
||||
*/
|
||||
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 注册认证信息
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
* @param authInfo 认证信息
|
||||
*/
|
||||
public void registerAuth(NetSocket socket, AuthInfo authInfo) {
|
||||
// 如果设备已有其他连接,先清理旧连接
|
||||
// TODO @haohao:是不是允许同时连接?就像 mqtt 应该也允许重复连接哈?
|
||||
NetSocket oldSocket = deviceSocketMap.get(authInfo.getDeviceId());
|
||||
if (oldSocket != null && oldSocket != socket) {
|
||||
log.info("[registerAuth][设备已有其他连接,清理旧连接] 设备 ID: {}, 旧连接: {}",
|
||||
authInfo.getDeviceId(), oldSocket.remoteAddress());
|
||||
authStatusMap.remove(oldSocket);
|
||||
}
|
||||
|
||||
// 注册新认证信息
|
||||
authStatusMap.put(socket, authInfo);
|
||||
deviceSocketMap.put(authInfo.getDeviceId(), socket);
|
||||
|
||||
log.info("[registerAuth][注册认证信息] 设备 ID: {}, 连接: {}, productKey: {}, deviceName: {}",
|
||||
authInfo.getDeviceId(), socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销认证信息
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
*/
|
||||
public void unregisterAuth(NetSocket socket) {
|
||||
AuthInfo authInfo = authStatusMap.remove(socket);
|
||||
if (authInfo != null) {
|
||||
deviceSocketMap.remove(authInfo.getDeviceId());
|
||||
log.info("[unregisterAuth][注销认证信息] 设备 ID: {}, 连接: {}",
|
||||
authInfo.getDeviceId(), socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @haohao:建议暂时没用的方法,可以删除掉;整体聚焦!
|
||||
/**
|
||||
* 注销设备认证信息
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void unregisterAuth(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.remove(deviceId);
|
||||
if (socket != null) {
|
||||
AuthInfo authInfo = authStatusMap.remove(socket);
|
||||
if (authInfo != null) {
|
||||
log.info("[unregisterAuth][注销设备认证信息] 设备 ID: {}, 连接: {}",
|
||||
deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取认证信息
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
* @return 认证信息,如果未认证则返回 null
|
||||
*/
|
||||
public AuthInfo getAuthInfo(NetSocket socket) {
|
||||
return authStatusMap.get(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备的认证信息
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 认证信息,如果设备未认证则返回 null
|
||||
*/
|
||||
public AuthInfo getAuthInfo(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.get(deviceId);
|
||||
return socket != null ? authStatusMap.get(socket) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接是否已认证
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
* @return 是否已认证
|
||||
*/
|
||||
public boolean isAuthenticated(NetSocket socket) {
|
||||
return authStatusMap.containsKey(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否已认证
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 是否已认证
|
||||
*/
|
||||
public boolean isAuthenticated(Long deviceId) {
|
||||
return deviceSocketMap.containsKey(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备的 TCP 连接
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return TCP 连接,如果设备未认证则返回 null
|
||||
*/
|
||||
public NetSocket getDeviceSocket(Long deviceId) {
|
||||
return deviceSocketMap.get(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前已认证设备数量
|
||||
*
|
||||
* @return 已认证设备数量
|
||||
*/
|
||||
public int getAuthenticatedDeviceCount() {
|
||||
return deviceSocketMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有已认证设备 ID
|
||||
*
|
||||
* @return 已认证设备 ID 集合
|
||||
*/
|
||||
public java.util.Set<Long> getAuthenticatedDeviceIds() {
|
||||
return deviceSocketMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理所有认证信息
|
||||
*/
|
||||
public void clearAll() {
|
||||
int count = authStatusMap.size();
|
||||
authStatusMap.clear();
|
||||
deviceSocketMap.clear();
|
||||
// TODO @haohao:第一个括号是方法,第二个括号是明细日志;其它日志,也可以检查下哈。
|
||||
log.info("[clearAll][清理所有认证信息] 清理数量: {}", count);
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证信息
|
||||
*/
|
||||
@Data
|
||||
public static class AuthInfo {
|
||||
|
||||
/**
|
||||
* 设备编号
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 产品标识
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
// TODO @haohao:令牌不要存储,万一有安全问题哈;
|
||||
/**
|
||||
* 认证令牌
|
||||
*/
|
||||
private String token;
|
||||
|
||||
/**
|
||||
* 客户端 ID
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,185 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
|
||||
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* IoT 网关 TCP 连接管理器
|
||||
* <p>
|
||||
* 统一管理 TCP 连接的认证状态、设备会话和消息发送功能:
|
||||
* 1. 管理 TCP 连接的认证状态
|
||||
* 2. 管理设备会话和在线状态
|
||||
* 3. 管理消息发送到设备
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotTcpConnectionManager {
|
||||
|
||||
/**
|
||||
* 连接信息映射:NetSocket -> 连接信息
|
||||
*/
|
||||
private final Map<NetSocket, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备 ID -> NetSocket 的映射(用于快速查找)
|
||||
*/
|
||||
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* NetSocket -> 设备 ID 的映射(用于连接断开时清理)
|
||||
*/
|
||||
private final Map<NetSocket, Long> socketDeviceMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 注册设备连接(包含认证信息)
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
* @param deviceId 设备 ID
|
||||
* @param authInfo 认证信息
|
||||
*/
|
||||
public void registerConnection(NetSocket socket, Long deviceId, AuthInfo authInfo) {
|
||||
// 如果设备已有其他连接,先清理旧连接
|
||||
NetSocket oldSocket = deviceSocketMap.get(deviceId);
|
||||
if (oldSocket != null && oldSocket != socket) {
|
||||
log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]",
|
||||
deviceId, oldSocket.remoteAddress());
|
||||
oldSocket.close();
|
||||
// 清理所有相关映射
|
||||
connectionMap.remove(oldSocket);
|
||||
socketDeviceMap.remove(oldSocket);
|
||||
}
|
||||
|
||||
// 注册新连接 - 更新所有映射关系
|
||||
ConnectionInfo connectionInfo = new ConnectionInfo()
|
||||
.setDeviceId(deviceId)
|
||||
.setAuthInfo(authInfo)
|
||||
.setAuthenticated(true);
|
||||
|
||||
connectionMap.put(socket, connectionInfo);
|
||||
deviceSocketMap.put(deviceId, socket);
|
||||
socketDeviceMap.put(socket, deviceId);
|
||||
|
||||
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
|
||||
deviceId, socket.remoteAddress(), authInfo.getProductKey(), authInfo.getDeviceName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销设备连接
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
*/
|
||||
public void unregisterConnection(NetSocket socket) {
|
||||
ConnectionInfo connectionInfo = connectionMap.remove(socket);
|
||||
Long deviceId = socketDeviceMap.remove(socket);
|
||||
|
||||
if (connectionInfo != null && deviceId != null) {
|
||||
deviceSocketMap.remove(deviceId);
|
||||
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
|
||||
deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销设备连接(通过设备 ID)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void unregisterConnection(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.remove(deviceId);
|
||||
if (socket != null) {
|
||||
connectionMap.remove(socket);
|
||||
socketDeviceMap.remove(socket);
|
||||
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接是否已认证
|
||||
*/
|
||||
public boolean isAuthenticated(NetSocket socket) {
|
||||
ConnectionInfo info = connectionMap.get(socket);
|
||||
return info != null && info.isAuthenticated();
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接是否未认证
|
||||
*/
|
||||
public boolean isNotAuthenticated(NetSocket socket) {
|
||||
return !isAuthenticated(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取连接的认证信息
|
||||
*/
|
||||
public AuthInfo getAuthInfo(NetSocket socket) {
|
||||
ConnectionInfo info = connectionMap.get(socket);
|
||||
return info != null ? info.getAuthInfo() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否在线
|
||||
*/
|
||||
public boolean isDeviceOnline(Long deviceId) {
|
||||
return deviceSocketMap.containsKey(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否离线
|
||||
*/
|
||||
public boolean isDeviceOffline(Long deviceId) {
|
||||
return !isDeviceOnline(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息到设备
|
||||
*/
|
||||
public boolean sendToDevice(Long deviceId, byte[] data) {
|
||||
NetSocket socket = deviceSocketMap.get(deviceId);
|
||||
if (socket == null) {
|
||||
log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
socket.write(io.vertx.core.buffer.Buffer.buffer(data));
|
||||
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, data.length);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e);
|
||||
// 发送失败时清理连接
|
||||
unregisterConnection(socket);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接信息
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public static class ConnectionInfo {
|
||||
private Long deviceId;
|
||||
private AuthInfo authInfo;
|
||||
private boolean authenticated;
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证信息
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
public static class AuthInfo {
|
||||
private Long deviceId;
|
||||
private String productKey;
|
||||
private String deviceName;
|
||||
private String clientId;
|
||||
}
|
||||
}
|
||||
@@ -1,144 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
|
||||
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
// TODO @haohao:IotTcpSessionManager、IotTcpAuthManager 是不是融合哈?
|
||||
/**
|
||||
* IoT 网关 TCP 会话管理器
|
||||
* <p>
|
||||
* 维护设备 ID 和 TCP 连接的映射关系,支持下行消息发送
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotTcpSessionManager {
|
||||
|
||||
/**
|
||||
* 设备 ID -> TCP 连接的映射
|
||||
*/
|
||||
private final Map<Long, NetSocket> deviceSocketMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* TCP 连接 -> 设备 ID 的映射(用于连接断开时清理)
|
||||
*/
|
||||
private final Map<NetSocket, Long> socketDeviceMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 注册设备会话
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param socket TCP 连接
|
||||
*/
|
||||
public void registerSession(Long deviceId, NetSocket socket) {
|
||||
// 如果设备已有连接,先断开旧连接
|
||||
NetSocket oldSocket = deviceSocketMap.get(deviceId);
|
||||
if (oldSocket != null && oldSocket != socket) {
|
||||
log.info("[registerSession][设备已有连接,断开旧连接] 设备 ID: {}, 旧连接: {}", deviceId, oldSocket.remoteAddress());
|
||||
oldSocket.close();
|
||||
socketDeviceMap.remove(oldSocket);
|
||||
}
|
||||
|
||||
// 注册新连接
|
||||
deviceSocketMap.put(deviceId, socket);
|
||||
socketDeviceMap.put(socket, deviceId);
|
||||
|
||||
log.info("[registerSession][注册设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销设备会话
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void unregisterSession(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.remove(deviceId);
|
||||
if (socket != null) {
|
||||
socketDeviceMap.remove(socket);
|
||||
log.info("[unregisterSession][注销设备会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销 TCP 连接会话
|
||||
*
|
||||
* @param socket TCP 连接
|
||||
*/
|
||||
public void unregisterSession(NetSocket socket) {
|
||||
Long deviceId = socketDeviceMap.remove(socket);
|
||||
if (deviceId != null) {
|
||||
deviceSocketMap.remove(deviceId);
|
||||
log.info("[unregisterSession][注销连接会话] 设备 ID: {}, 连接: {}", deviceId, socket.remoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备的 TCP 连接
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return TCP 连接,如果设备未连接则返回 null
|
||||
*/
|
||||
public NetSocket getDeviceSocket(Long deviceId) {
|
||||
return deviceSocketMap.get(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查设备是否在线
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 是否在线
|
||||
*/
|
||||
public boolean isDeviceOnline(Long deviceId) {
|
||||
NetSocket socket = deviceSocketMap.get(deviceId);
|
||||
return socket != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息到设备
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param data 消息数据
|
||||
* @return 是否发送成功
|
||||
*/
|
||||
public boolean sendToDevice(Long deviceId, byte[] data) {
|
||||
NetSocket socket = deviceSocketMap.get(deviceId);
|
||||
if (socket == null) {
|
||||
log.warn("[sendToDevice][设备未连接] 设备 ID: {}", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
socket.write(io.vertx.core.buffer.Buffer.buffer(data));
|
||||
log.debug("[sendToDevice][发送消息成功] 设备 ID: {}, 数据长度: {} 字节", deviceId, data.length);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("[sendToDevice][发送消息失败] 设备 ID: {}", deviceId, e);
|
||||
// 发送失败时清理连接
|
||||
unregisterSession(deviceId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前在线设备数量
|
||||
*
|
||||
* @return 在线设备数量
|
||||
*/
|
||||
public int getOnlineDeviceCount() {
|
||||
return deviceSocketMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有在线设备 ID
|
||||
*
|
||||
* @return 在线设备 ID 集合
|
||||
*/
|
||||
public java.util.Set<Long> getOnlineDeviceIds() {
|
||||
return deviceSocketMap.keySet();
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
@@ -13,62 +14,50 @@ import lombok.extern.slf4j.Slf4j;
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class IotTcpDownstreamHandler {
|
||||
|
||||
private final IotDeviceMessageService messageService;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotTcpSessionManager sessionManager;
|
||||
|
||||
// TODO @haohao:这个可以使用 lombok 简化构造方法
|
||||
public IotTcpDownstreamHandler(IotDeviceMessageService messageService,
|
||||
IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
|
||||
this.messageService = messageService;
|
||||
this.deviceService = deviceService;
|
||||
this.sessionManager = sessionManager;
|
||||
}
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
|
||||
/**
|
||||
* 处理下行消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
*/
|
||||
public void handle(IotDeviceMessage message) {
|
||||
try {
|
||||
log.info("[handle][处理下行消息] 设备 ID: {}, 方法: {}, 消息 ID: {}",
|
||||
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
|
||||
// TODO @haohao 1. 和 2. 可以合成 1.1 1.2 并且中间可以不空行;
|
||||
// 1. 获取设备信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
|
||||
if (device == null) {
|
||||
log.error("[handle][设备不存在] 设备 ID: {}", message.getDeviceId());
|
||||
// 1.1 获取设备信息
|
||||
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.error("[handle][设备不存在,设备 ID: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
// 1.2 检查设备是否在线
|
||||
if (connectionManager.isDeviceOffline(message.getDeviceId())) {
|
||||
log.warn("[handle][设备不在线,设备 ID: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 检查设备是否在线
|
||||
if (!sessionManager.isDeviceOnline(message.getDeviceId())) {
|
||||
log.warn("[handle][设备不在线] 设备 ID: {}", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
// 2. 根据产品 Key 和设备名称编码消息并发送到设备
|
||||
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
|
||||
|
||||
// 3. 编码消息
|
||||
byte[] bytes = messageService.encodeDeviceMessage(message, device.getCodecType());
|
||||
|
||||
// 4. 发送消息到设备
|
||||
boolean success = sessionManager.sendToDevice(message.getDeviceId(), bytes);
|
||||
if (success) {
|
||||
log.info("[handle][下行消息发送成功] 设备 ID: {}, 方法: {}, 消息 ID: {}, 数据长度: {} 字节",
|
||||
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
|
||||
} else {
|
||||
log.error("[handle][下行消息发送失败] 设备 ID: {}, 方法: {}, 消息 ID: {}",
|
||||
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
} // TODO @haohao:下面这个空行,可以考虑去掉的哈。
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理下行消息失败] 设备 ID: {}, 方法: {}, 消息内容: {}",
|
||||
message.getDeviceId(), message.getMethod(), message.getParams(), e);
|
||||
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,50 +12,48 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpAuthManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpSessionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* TCP 上行消息处理器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
// TODO @haohao:这两个变量,可以复用 IotTcpBinaryDeviceMessageCodec 的 TYPE
|
||||
private static final String CODEC_TYPE_JSON = "TCP_JSON";
|
||||
private static final String CODEC_TYPE_BINARY = "TCP_BINARY";
|
||||
|
||||
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
||||
private static final String AUTH_METHOD = "auth";
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
private final IotDeviceService deviceService;
|
||||
|
||||
private final IotTcpSessionManager sessionManager;
|
||||
|
||||
private final IotTcpAuthManager authManager;
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
private final String serverId;
|
||||
|
||||
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService,
|
||||
IotDeviceService deviceService, IotTcpSessionManager sessionManager) {
|
||||
IotDeviceService deviceService, IotTcpConnectionManager connectionManager) {
|
||||
this.deviceMessageService = deviceMessageService;
|
||||
this.deviceService = deviceService;
|
||||
this.sessionManager = sessionManager;
|
||||
this.authManager = SpringUtil.getBean(IotTcpAuthManager.class);
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.connectionManager = connectionManager;
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.serverId = protocol.getServerId();
|
||||
}
|
||||
@@ -63,207 +61,313 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
@Override
|
||||
public void handle(NetSocket socket) {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.info("[handle][收到设备连接] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
|
||||
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
|
||||
// 设置异常和关闭处理器
|
||||
socket.exceptionHandler(ex -> {
|
||||
log.error("[handle][连接异常] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress(), ex);
|
||||
cleanupSession(socket);
|
||||
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
socket.closeHandler(v -> {
|
||||
log.info("[handle][连接关闭] 客户端 ID: {}, 地址: {}", clientId, socket.remoteAddress());
|
||||
cleanupSession(socket);
|
||||
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
socket.handler(buffer -> handleDataPackage(clientId, buffer, socket));
|
||||
socket.handler(buffer -> processMessage(clientId, buffer, socket));
|
||||
}
|
||||
|
||||
private void handleDataPackage(String clientId, Buffer buffer, NetSocket socket) {
|
||||
/**
|
||||
* 处理消息
|
||||
*/
|
||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
|
||||
try {
|
||||
// 1. 数据包基础检查
|
||||
if (buffer.length() == 0) {
|
||||
log.warn("[handleDataPackage][数据包为空] 客户端 ID: {}", clientId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 解码消息
|
||||
// 2. 解码消息
|
||||
MessageInfo messageInfo = decodeMessage(buffer);
|
||||
if (messageInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO @haohao:2. 和 3. 可以合并成 2.1 2.2 ,都是异常的情况。然后 3. 可以 return 直接;
|
||||
// 2. 获取设备信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(messageInfo.message.getDeviceId());
|
||||
if (device == null) {
|
||||
sendError(socket, messageInfo.message.getRequestId(), "设备不存在", messageInfo.codecType);
|
||||
return;
|
||||
// 3. 根据消息类型路由处理
|
||||
if (isAuthRequest(messageInfo.message)) {
|
||||
// 认证请求:无需检查认证状态
|
||||
handleAuthenticationRequest(clientId, messageInfo, socket);
|
||||
} else {
|
||||
// 业务消息:需要检查认证状态
|
||||
handleBusinessRequest(clientId, messageInfo, socket);
|
||||
}
|
||||
|
||||
// 3. 处理消息
|
||||
if (!authManager.isAuthenticated(socket)) {
|
||||
handleAuthRequest(clientId, messageInfo.message, socket, messageInfo.codecType);
|
||||
} else {
|
||||
IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
|
||||
handleBusinessMessage(clientId, messageInfo.message, authInfo);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[handleDataPackage][处理数据包失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
|
||||
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理认证请求
|
||||
*/
|
||||
private void handleAuthRequest(String clientId, IotDeviceMessage message, NetSocket socket, String codecType) {
|
||||
private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
||||
try {
|
||||
// 1. 验证认证请求
|
||||
// TODO @haohao:ObjUtil.notEquals。减少取反
|
||||
if (!AUTH_METHOD.equals(message.getMethod())) {
|
||||
sendError(socket, message.getRequestId(), "请先进行认证", codecType);
|
||||
return;
|
||||
}
|
||||
IotDeviceMessage message = messageInfo.message;
|
||||
|
||||
// 2. 解析认证参数 // TODO @haohao:1. 和 2. 可以合并成 1.1 1.2 都是参数校验
|
||||
// 1. 解析认证参数
|
||||
AuthParams authParams = parseAuthParams(message.getParams());
|
||||
if (authParams == null) {
|
||||
sendError(socket, message.getRequestId(), "认证参数不完整", codecType);
|
||||
sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 执行认证流程
|
||||
// TODO @haohao:成功失败、都大哥日志,会不会更好哈?
|
||||
if (performAuthentication(authParams, socket, message.getRequestId(), codecType)) {
|
||||
log.info("[handleAuthRequest][认证成功] 客户端 ID: {}, username: {}", clientId, authParams.username);
|
||||
// 2. 执行认证
|
||||
if (!authenticateDevice(authParams)) {
|
||||
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
|
||||
clientId, authParams.username);
|
||||
sendError(socket, message.getRequestId(), "认证失败", messageInfo.codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解析设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
|
||||
if (deviceInfo == null) {
|
||||
sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 获取设备信息
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
if (device == null) {
|
||||
sendError(socket, message.getRequestId(), "设备不存在", messageInfo.codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 注册连接并发送成功响应
|
||||
registerConnection(socket, device, deviceInfo, authParams.clientId);
|
||||
sendOnlineMessage(deviceInfo);
|
||||
sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
|
||||
|
||||
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
|
||||
device.getId(), deviceInfo.getDeviceName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleAuthRequest][认证处理异常] 客户端 ID: {}", clientId, e);
|
||||
sendError(socket, message.getRequestId(), "认证处理异常: " + e.getMessage(), codecType);
|
||||
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
|
||||
sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理业务请求
|
||||
*/
|
||||
private void handleBusinessRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
||||
try {
|
||||
// 1. 检查认证状态
|
||||
if (connectionManager.isNotAuthenticated(socket)) {
|
||||
log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
|
||||
sendError(socket, messageInfo.message.getRequestId(), "请先进行认证", messageInfo.codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取认证信息并处理业务消息
|
||||
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
||||
processBusinessMessage(clientId, messageInfo.message, authInfo);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理业务消息
|
||||
*/
|
||||
private void handleBusinessMessage(String clientId, IotDeviceMessage message,
|
||||
IotTcpAuthManager.AuthInfo authInfo) {
|
||||
private void processBusinessMessage(String clientId, IotDeviceMessage message,
|
||||
IotTcpConnectionManager.AuthInfo authInfo) {
|
||||
try {
|
||||
message.setDeviceId(authInfo.getDeviceId());
|
||||
message.setServerId(serverId);
|
||||
|
||||
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(), authInfo.getDeviceName(),
|
||||
serverId);
|
||||
log.info("[handleBusinessMessage][业务消息处理完成] 客户端 ID: {}, 消息 ID: {}, 设备 ID: {}, 方法: {}",
|
||||
clientId, message.getId(), message.getDeviceId(), message.getMethod());
|
||||
// 发送到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
|
||||
authInfo.getDeviceName(), serverId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessMessage][处理业务消息失败] 客户端 ID: {}, 错误: {}", clientId, e.getMessage(), e);
|
||||
log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
|
||||
clientId, message.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解码消息
|
||||
*/
|
||||
// TODO @haohao:是不是还是直接管理后台配置协议,然后直接使用就好啦。暂时不考虑动态解析哈。保持一致,降低理解成本哈。
|
||||
private MessageInfo decodeMessage(Buffer buffer) {
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 1. 快速检测消息格式类型
|
||||
String codecType = detectMessageFormat(buffer);
|
||||
|
||||
try {
|
||||
String rawData = buffer.toString();
|
||||
String codecType = isJsonFormat(rawData) ? CODEC_TYPE_JSON : CODEC_TYPE_BINARY;
|
||||
// 2. 使用检测到的格式进行解码
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
return message != null ? new MessageInfo(message, codecType) : null;
|
||||
|
||||
if (message == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new MessageInfo(message, codecType);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.debug("[decodeMessage][消息解码失败] 错误: {}", e.getMessage());
|
||||
log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
|
||||
codecType, buffer.length(), e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行认证
|
||||
* 检测消息格式类型
|
||||
* 优化性能:避免不必要的字符串转换
|
||||
*/
|
||||
// TODO @haohao:下面的 1. 2. 可以合并下,本质也是校验哈。
|
||||
private boolean performAuthentication(AuthParams authParams, NetSocket socket, String requestId, String codecType) {
|
||||
// 1. 执行认证
|
||||
if (!authenticateDevice(authParams)) {
|
||||
sendError(socket, requestId, "认证失败", codecType);
|
||||
return false;
|
||||
private String detectMessageFormat(Buffer buffer) {
|
||||
if (buffer.length() == 0) {
|
||||
return CODEC_TYPE_JSON; // 默认使用 JSON
|
||||
}
|
||||
|
||||
// 2. 获取设备信息
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(authParams.username);
|
||||
if (deviceInfo == null) {
|
||||
sendError(socket, requestId, "解析设备信息失败", codecType);
|
||||
return false;
|
||||
// 1. 优先检测二进制格式(检查魔术字节 0x7E)
|
||||
if (isBinaryFormat(buffer)) {
|
||||
return CODEC_TYPE_BINARY;
|
||||
}
|
||||
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName());
|
||||
if (device == null) {
|
||||
sendError(socket, requestId, "设备不存在", codecType);
|
||||
return false;
|
||||
// 2. 检测 JSON 格式(检查前几个有效字符)
|
||||
if (isJsonFormat(buffer)) {
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
// 3. 注册认证信息
|
||||
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
registerAuthInfo(socket, device, deviceInfo, token, authParams.clientId);
|
||||
|
||||
// 4. 发送上线消息和成功响应
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
|
||||
serverId);
|
||||
sendSuccess(socket, requestId, "认证成功", codecType);
|
||||
return true;
|
||||
// 3. 默认尝试 JSON 格式
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应
|
||||
* 检测二进制格式
|
||||
* 通过检查魔术字节快速识别,避免完整字符串转换
|
||||
*/
|
||||
private boolean isBinaryFormat(Buffer buffer) {
|
||||
// 二进制协议最小长度检查
|
||||
if (buffer.length() < 8) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 检查魔术字节 0x7E(二进制协议的第一个字节)
|
||||
byte firstByte = buffer.getByte(0);
|
||||
return firstByte == (byte) 0x7E;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检测 JSON 格式
|
||||
* 只检查前几个有效字符,避免完整字符串转换
|
||||
*/
|
||||
private boolean isJsonFormat(Buffer buffer) {
|
||||
try {
|
||||
// 检查前 64 个字节或整个缓冲区(取较小值)
|
||||
int checkLength = Math.min(buffer.length(), 64);
|
||||
String prefix = buffer.getString(0, checkLength, StandardCharsets.UTF_8.name());
|
||||
|
||||
if (StrUtil.isBlank(prefix)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String trimmed = prefix.trim();
|
||||
// JSON 格式必须以 { 或 [ 开头
|
||||
return trimmed.startsWith("{") || trimmed.startsWith("[");
|
||||
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册连接信息
|
||||
*/
|
||||
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
|
||||
// 创建认证信息
|
||||
IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
|
||||
.setDeviceId(device.getId())
|
||||
.setProductKey(deviceInfo.getProductKey())
|
||||
.setDeviceName(deviceInfo.getDeviceName())
|
||||
.setClientId(clientId);
|
||||
|
||||
// 注册连接
|
||||
connectionManager.registerConnection(socket, device.getId(), authInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备上线消息
|
||||
*/
|
||||
private void sendOnlineMessage(IotDeviceAuthUtils.DeviceInfo deviceInfo) {
|
||||
try {
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, deviceInfo.getProductKey(),
|
||||
deviceInfo.getDeviceName(), serverId);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", deviceInfo.getDeviceName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理连接
|
||||
*/
|
||||
private void cleanupConnection(NetSocket socket) {
|
||||
try {
|
||||
// 发送离线消息(如果已认证)
|
||||
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
||||
if (authInfo != null) {
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(),
|
||||
authInfo.getDeviceName(), serverId);
|
||||
}
|
||||
|
||||
// 注销连接
|
||||
connectionManager.unregisterConnection(socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupConnection][清理连接失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*/
|
||||
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
|
||||
try {
|
||||
Object responseData = buildResponseData(success, message);
|
||||
Object responseData = MapUtil.builder()
|
||||
.put("success", success)
|
||||
.put("message", message)
|
||||
.build();
|
||||
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
|
||||
success ? 0 : 401, message);
|
||||
|
||||
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
|
||||
socket.write(Buffer.buffer(encodedData));
|
||||
log.debug("[sendResponse][发送响应] success: {}, message: {}, requestId: {}", success, message, requestId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应失败] requestId: {}", requestId, e);
|
||||
log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建响应数据(不返回 token)
|
||||
*/
|
||||
private Object buildResponseData(boolean success, String message) {
|
||||
return MapUtil.builder()
|
||||
.put("success", success)
|
||||
.put("message", message)
|
||||
.build();
|
||||
}
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
/**
|
||||
* 清理会话
|
||||
* 判断是否为认证请求
|
||||
*/
|
||||
private void cleanupSession(NetSocket socket) {
|
||||
// 如果已认证,发送离线消息
|
||||
IotTcpAuthManager.AuthInfo authInfo = authManager.getAuthInfo(socket);
|
||||
if (authInfo != null) {
|
||||
// 发送离线消息
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, authInfo.getProductKey(), authInfo.getDeviceName(),
|
||||
serverId);
|
||||
}
|
||||
sessionManager.unregisterSession(socket);
|
||||
authManager.unregisterAuth(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为 JSON 格式
|
||||
*/
|
||||
private boolean isJsonFormat(String data) {
|
||||
if (StrUtil.isBlank(data)) {
|
||||
return false;
|
||||
}
|
||||
String trimmed = data.trim();
|
||||
return (trimmed.startsWith("{") && trimmed.endsWith("}")) || (trimmed.startsWith("[") && trimmed.endsWith("]"));
|
||||
private boolean isAuthRequest(IotDeviceMessage message) {
|
||||
return AUTH_METHOD.equals(message.getMethod());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -273,38 +377,37 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
|
||||
: JSONUtil.parseObj(params.toString());
|
||||
String clientId = paramsJson.getStr("clientId");
|
||||
String username = paramsJson.getStr("username");
|
||||
String password = paramsJson.getStr("password");
|
||||
return StrUtil.hasBlank(clientId, username, password) ? null : new AuthParams(clientId, username, password);
|
||||
|
||||
try {
|
||||
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
|
||||
: JSONUtil.parseObj(params.toString());
|
||||
|
||||
String clientId = paramsJson.getStr("clientId");
|
||||
String username = paramsJson.getStr("username");
|
||||
String password = paramsJson.getStr("password");
|
||||
|
||||
return StrUtil.hasBlank(clientId, username, password) ? null
|
||||
: new AuthParams(clientId, username, password);
|
||||
} catch (Exception e) {
|
||||
log.warn("[parseAuthParams][解析认证参数失败]", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证设备
|
||||
*/
|
||||
private boolean authenticateDevice(AuthParams authParams) {
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(authParams.clientId).setUsername(authParams.username).setPassword(authParams.password));
|
||||
return result.isSuccess() && result.getData();
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册认证信息
|
||||
*/
|
||||
private void registerAuthInfo(NetSocket socket, IotDeviceRespDTO device, IotDeviceAuthUtils.DeviceInfo deviceInfo,
|
||||
String token, String clientId) {
|
||||
// TODO @haohao:可以链式调用;
|
||||
IotTcpAuthManager.AuthInfo auth = new IotTcpAuthManager.AuthInfo();
|
||||
auth.setDeviceId(device.getId());
|
||||
auth.setProductKey(deviceInfo.getProductKey());
|
||||
auth.setDeviceName(deviceInfo.getDeviceName());
|
||||
auth.setToken(token);
|
||||
auth.setClientId(clientId);
|
||||
|
||||
authManager.registerAuth(socket, auth);
|
||||
sessionManager.registerSession(device.getId(), socket);
|
||||
try {
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(authParams.clientId)
|
||||
.setUsername(authParams.username)
|
||||
.setPassword(authParams.password));
|
||||
return result.isSuccess() && Boolean.TRUE.equals(result.getData());
|
||||
} catch (Exception e) {
|
||||
log.error("[authenticateDevice][设备认证异常,username: {}]", authParams.username, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -315,24 +418,32 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应(不返回 token)
|
||||
* 发送成功响应
|
||||
*/
|
||||
private void sendSuccess(NetSocket socket, String requestId, String message, String codecType) {
|
||||
sendResponse(socket, true, message, requestId, codecType);
|
||||
}
|
||||
|
||||
// TODO @haohao:使用 lombok,方便 jdk8 兼容
|
||||
// ==================== 内部类 ====================
|
||||
|
||||
/**
|
||||
* 认证参数
|
||||
*/
|
||||
private record AuthParams(String clientId, String username, String password) {
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class AuthParams {
|
||||
private final String clientId;
|
||||
private final String username;
|
||||
private final String password;
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息信息
|
||||
*/
|
||||
private record MessageInfo(IotDeviceMessage message, String codecType) {
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class MessageInfo {
|
||||
private final IotDeviceMessage message;
|
||||
private final String codecType;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -4,6 +4,15 @@ spring:
|
||||
profiles:
|
||||
active: local # 默认激活本地开发环境
|
||||
|
||||
# Redis 配置
|
||||
data:
|
||||
redis:
|
||||
host: 127.0.0.1 # Redis 服务器地址
|
||||
port: 6379 # Redis 服务器端口
|
||||
database: 0 # Redis 数据库索引
|
||||
# password: # Redis 密码,如果有的话
|
||||
timeout: 30000ms # 连接超时时间
|
||||
|
||||
--- #################### 消息队列相关 ####################
|
||||
|
||||
# rocketmq 配置项,对应 RocketMQProperties 配置类
|
||||
@@ -45,7 +54,7 @@ yudao:
|
||||
# 针对引入的 EMQX 组件的配置
|
||||
# ====================================
|
||||
emqx:
|
||||
enabled: false
|
||||
enabled: true
|
||||
http-port: 8090 # MQTT HTTP 服务端口
|
||||
mqtt-host: 127.0.0.1 # MQTT Broker 地址
|
||||
mqtt-port: 1883 # MQTT Broker 端口
|
||||
|
||||
Reference in New Issue
Block a user