review:【iot 物联网】tcp 协议的接入
This commit is contained in:
@@ -20,11 +20,11 @@ import java.nio.charset.StandardCharsets;
|
|||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||||
* | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4字节) |
|
* | 魔术字 | 版本号 | 消息类型| 消息标志| 消息长度(4 字节) |
|
||||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||||
* | 消息 ID 长度(2字节) | 消息 ID (变长字符串) |
|
* | 消息 ID 长度(2 字节) | 消息 ID (变长字符串) |
|
||||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||||
* | 方法名长度(2字节) | 方法名(变长字符串) |
|
* | 方法名长度(2 字节) | 方法名(变长字符串) |
|
||||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||||
* | 消息体数据(变长) |
|
* | 消息体数据(变长) |
|
||||||
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
* +--------+--------+--------+--------+--------+--------+--------+--------+
|
||||||
@@ -56,12 +56,21 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
*/
|
*/
|
||||||
private static final byte PROTOCOL_VERSION = (byte) 0x01;
|
private static final byte PROTOCOL_VERSION = (byte) 0x01;
|
||||||
|
|
||||||
|
// TODO @haohao:这个要不直接静态枚举,不用 MessageType
|
||||||
/**
|
/**
|
||||||
* 消息类型常量
|
* 消息类型常量
|
||||||
*/
|
*/
|
||||||
public static class MessageType {
|
public static class MessageType {
|
||||||
public static final byte REQUEST = 0x01; // 请求消息
|
|
||||||
public static final byte RESPONSE = 0x02; // 响应消息
|
/**
|
||||||
|
* 请求消息
|
||||||
|
*/
|
||||||
|
public static final byte REQUEST = 0x01;
|
||||||
|
/**
|
||||||
|
* 响应消息
|
||||||
|
*/
|
||||||
|
public static final byte RESPONSE = 0x02;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -83,17 +92,13 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
public byte[] encode(IotDeviceMessage message) {
|
public byte[] encode(IotDeviceMessage message) {
|
||||||
Assert.notNull(message, "消息不能为空");
|
Assert.notNull(message, "消息不能为空");
|
||||||
Assert.notBlank(message.getMethod(), "消息方法不能为空");
|
Assert.notBlank(message.getMethod(), "消息方法不能为空");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 1. 确定消息类型
|
// 1. 确定消息类型
|
||||||
byte messageType = determineMessageType(message);
|
byte messageType = determineMessageType(message);
|
||||||
|
|
||||||
// 2. 构建消息体
|
// 2. 构建消息体
|
||||||
byte[] bodyData = buildMessageBody(message, messageType);
|
byte[] bodyData = buildMessageBody(message, messageType);
|
||||||
|
|
||||||
// 3. 构建完整消息(不包含deviceId,由连接上下文管理)
|
// 3. 构建完整消息(不包含deviceId,由连接上下文管理)
|
||||||
return buildCompleteMessage(message, messageType, bodyData);
|
return buildCompleteMessage(message, messageType, bodyData);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
|
log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e);
|
||||||
throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e);
|
throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e);
|
||||||
@@ -104,16 +109,12 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
public IotDeviceMessage decode(byte[] bytes) {
|
public IotDeviceMessage decode(byte[] bytes) {
|
||||||
Assert.notNull(bytes, "待解码数据不能为空");
|
Assert.notNull(bytes, "待解码数据不能为空");
|
||||||
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
|
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Buffer buffer = Buffer.buffer(bytes);
|
Buffer buffer = Buffer.buffer(bytes);
|
||||||
|
|
||||||
// 1. 解析协议头部
|
// 1. 解析协议头部
|
||||||
ProtocolHeader header = parseProtocolHeader(buffer);
|
ProtocolHeader header = parseProtocolHeader(buffer);
|
||||||
|
|
||||||
// 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
|
// 2. 解析消息内容(不包含deviceId,由上层连接管理器设置)
|
||||||
return parseMessageContent(buffer, header);
|
return parseMessageContent(buffer, header);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
|
log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e);
|
||||||
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
|
throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e);
|
||||||
@@ -128,6 +129,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
*/
|
*/
|
||||||
private byte determineMessageType(IotDeviceMessage message) {
|
private byte determineMessageType(IotDeviceMessage message) {
|
||||||
// 判断是否为响应消息:有响应码或响应消息时为响应
|
// 判断是否为响应消息:有响应码或响应消息时为响应
|
||||||
|
// TODO @haohao:感觉只判断 code 更稳妥点?msg 有可能空。。。
|
||||||
if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
|
if (message.getCode() != null || StrUtil.isNotBlank(message.getMsg())) {
|
||||||
return MessageType.RESPONSE;
|
return MessageType.RESPONSE;
|
||||||
}
|
}
|
||||||
@@ -140,27 +142,26 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
*/
|
*/
|
||||||
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
|
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
|
||||||
Buffer bodyBuffer = Buffer.buffer();
|
Buffer bodyBuffer = Buffer.buffer();
|
||||||
|
|
||||||
if (messageType == MessageType.RESPONSE) {
|
if (messageType == MessageType.RESPONSE) {
|
||||||
// 响应消息:code + msg长度 + msg + data
|
// code
|
||||||
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
|
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
|
||||||
|
// msg
|
||||||
String msg = message.getMsg() != null ? message.getMsg() : "";
|
String msg = message.getMsg() != null ? message.getMsg() : "";
|
||||||
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
|
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
|
||||||
bodyBuffer.appendShort((short) msgBytes.length);
|
bodyBuffer.appendShort((short) msgBytes.length);
|
||||||
bodyBuffer.appendBytes(msgBytes);
|
bodyBuffer.appendBytes(msgBytes);
|
||||||
|
// data
|
||||||
if (message.getData() != null) {
|
if (message.getData() != null) {
|
||||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
|
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 请求消息:包含 params 或 data
|
// params
|
||||||
|
// TODO @haohao:请求是不是只处理 message.getParams() 哈?
|
||||||
Object payload = message.getParams() != null ? message.getParams() : message.getData();
|
Object payload = message.getParams() != null ? message.getParams() : message.getData();
|
||||||
if (payload != null) {
|
if (payload != null) {
|
||||||
bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
|
bodyBuffer.appendBytes(JsonUtils.toJsonByte(payload));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return bodyBuffer.getBytes();
|
return bodyBuffer.getBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,35 +170,30 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
*/
|
*/
|
||||||
private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
|
private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
|
||||||
Buffer buffer = Buffer.buffer();
|
Buffer buffer = Buffer.buffer();
|
||||||
|
|
||||||
// 1. 写入协议头部
|
// 1. 写入协议头部
|
||||||
buffer.appendByte(MAGIC_NUMBER);
|
buffer.appendByte(MAGIC_NUMBER);
|
||||||
buffer.appendByte(PROTOCOL_VERSION);
|
buffer.appendByte(PROTOCOL_VERSION);
|
||||||
buffer.appendByte(messageType);
|
buffer.appendByte(messageType);
|
||||||
buffer.appendByte((byte) 0x00); // 消息标志,预留字段
|
buffer.appendByte((byte) 0x00); // 消息标志,预留字段 TODO @haohao:这个标识的作用是啥呀?
|
||||||
|
// 2. 预留消息长度位置(在 6. 更新消息长度)
|
||||||
// 2. 预留消息长度位置
|
|
||||||
int lengthPosition = buffer.length();
|
int lengthPosition = buffer.length();
|
||||||
buffer.appendInt(0);
|
buffer.appendInt(0);
|
||||||
|
// 3. 写入消息 ID
|
||||||
// 3. 写入消息ID
|
|
||||||
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
|
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
|
||||||
|
// TODO @haohao:复用 IotDeviceMessageUtils 的 generateMessageId 哇?
|
||||||
: generateMessageId(message.getMethod());
|
: generateMessageId(message.getMethod());
|
||||||
|
// TODO @haohao:StrUtil.utf8Bytes()
|
||||||
byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
|
byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
|
||||||
buffer.appendShort((short) messageIdBytes.length);
|
buffer.appendShort((short) messageIdBytes.length);
|
||||||
buffer.appendBytes(messageIdBytes);
|
buffer.appendBytes(messageIdBytes);
|
||||||
|
|
||||||
// 4. 写入方法名
|
// 4. 写入方法名
|
||||||
byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
|
byte[] methodBytes = message.getMethod().getBytes(StandardCharsets.UTF_8);
|
||||||
buffer.appendShort((short) methodBytes.length);
|
buffer.appendShort((short) methodBytes.length);
|
||||||
buffer.appendBytes(methodBytes);
|
buffer.appendBytes(methodBytes);
|
||||||
|
|
||||||
// 5. 写入消息体
|
// 5. 写入消息体
|
||||||
buffer.appendBytes(bodyData);
|
buffer.appendBytes(bodyData);
|
||||||
|
|
||||||
// 6. 更新消息长度
|
// 6. 更新消息长度
|
||||||
buffer.setInt(lengthPosition, buffer.length());
|
buffer.setInt(lengthPosition, buffer.length());
|
||||||
|
|
||||||
return buffer.getBytes();
|
return buffer.getBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,16 +206,15 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
|
|
||||||
// ==================== 解码相关方法 ====================
|
// ==================== 解码相关方法 ====================
|
||||||
|
|
||||||
|
// TODO @haohao:是不是把 parseProtocolHeader、parseMessageContent 合并?
|
||||||
/**
|
/**
|
||||||
* 解析协议头部
|
* 解析协议头部
|
||||||
*/
|
*/
|
||||||
private ProtocolHeader parseProtocolHeader(Buffer buffer) {
|
private ProtocolHeader parseProtocolHeader(Buffer buffer) {
|
||||||
int index = 0;
|
int index = 0;
|
||||||
|
|
||||||
// 1. 验证魔术字
|
// 1. 验证魔术字
|
||||||
byte magic = buffer.getByte(index++);
|
byte magic = buffer.getByte(index++);
|
||||||
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
|
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
|
||||||
|
|
||||||
// 2. 验证版本号
|
// 2. 验证版本号
|
||||||
byte version = buffer.getByte(index++);
|
byte version = buffer.getByte(index++);
|
||||||
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
|
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
|
||||||
@@ -227,7 +222,6 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
// 3. 读取消息类型
|
// 3. 读取消息类型
|
||||||
byte messageType = buffer.getByte(index++);
|
byte messageType = buffer.getByte(index++);
|
||||||
Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
|
Assert.isTrue(isValidMessageType(messageType), "无效的消息类型: " + messageType);
|
||||||
|
|
||||||
// 4. 读取消息标志(暂时跳过)
|
// 4. 读取消息标志(暂时跳过)
|
||||||
byte messageFlags = buffer.getByte(index++);
|
byte messageFlags = buffer.getByte(index++);
|
||||||
|
|
||||||
@@ -235,7 +229,8 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
int messageLength = buffer.getInt(index);
|
int messageLength = buffer.getInt(index);
|
||||||
index += 4;
|
index += 4;
|
||||||
|
|
||||||
Assert.isTrue(messageLength == buffer.length(), "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
|
Assert.isTrue(messageLength == buffer.length(),
|
||||||
|
"消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
|
||||||
|
|
||||||
return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
|
return new ProtocolHeader(magic, version, messageType, messageFlags, messageLength, index);
|
||||||
}
|
}
|
||||||
@@ -246,7 +241,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
|
private IotDeviceMessage parseMessageContent(Buffer buffer, ProtocolHeader header) {
|
||||||
int index = header.getNextIndex();
|
int index = header.getNextIndex();
|
||||||
|
|
||||||
// 1. 读取消息ID
|
// 1. 读取消息 ID
|
||||||
short messageIdLength = buffer.getShort(index);
|
short messageIdLength = buffer.getShort(index);
|
||||||
index += 2;
|
index += 2;
|
||||||
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
|
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
|
||||||
@@ -314,12 +309,8 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
if (startIndex >= endIndex) {
|
if (startIndex >= endIndex) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
|
String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
|
||||||
if (StrUtil.isBlank(jsonStr)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return JsonUtils.parseObject(jsonStr, Object.class);
|
return JsonUtils.parseObject(jsonStr, Object.class);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
|
log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
|
||||||
@@ -329,6 +320,7 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
|
|
||||||
// ==================== 辅助方法 ====================
|
// ==================== 辅助方法 ====================
|
||||||
|
|
||||||
|
// TODO @haohao:这个貌似只用一次,可以考虑不抽小方法哈;
|
||||||
/**
|
/**
|
||||||
* 验证消息类型是否有效
|
* 验证消息类型是否有效
|
||||||
*/
|
*/
|
||||||
@@ -344,11 +336,16 @@ public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec {
|
|||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
private static class ProtocolHeader {
|
private static class ProtocolHeader {
|
||||||
|
|
||||||
private byte magic;
|
private byte magic;
|
||||||
private byte version;
|
private byte version;
|
||||||
private byte messageType;
|
private byte messageType;
|
||||||
private byte messageFlags;
|
private byte messageFlags;
|
||||||
private int messageLength;
|
private int messageLength;
|
||||||
private int nextIndex; // 指向消息内容开始位置
|
/**
|
||||||
|
* 指向消息内容开始位置
|
||||||
|
*/
|
||||||
|
private int nextIndex;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -14,13 +14,13 @@ import org.springframework.stereotype.Component;
|
|||||||
*
|
*
|
||||||
* 采用纯 JSON 格式传输,格式如下:
|
* 采用纯 JSON 格式传输,格式如下:
|
||||||
* {
|
* {
|
||||||
* "id": "消息 ID",
|
* "id": "消息 ID",
|
||||||
* "method": "消息方法",
|
* "method": "消息方法",
|
||||||
* "params": {...}, // 请求参数
|
* "params": {...}, // 请求参数
|
||||||
* "data": {...}, // 响应结果
|
* "data": {...}, // 响应结果
|
||||||
* "code": 200, // 响应错误码
|
* "code": 200, // 响应错误码
|
||||||
* "msg": "success", // 响应提示
|
* "msg": "success", // 响应提示
|
||||||
* "timestamp": 时间戳
|
* "timestamp": 时间戳
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* @author 芋道源码
|
* @author 芋道源码
|
||||||
|
|||||||
@@ -39,10 +39,10 @@ public class IotTcpUpstreamProtocol {
|
|||||||
private NetServer tcpServer;
|
private NetServer tcpServer;
|
||||||
|
|
||||||
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
|
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
|
||||||
IotDeviceService deviceService,
|
IotDeviceService deviceService,
|
||||||
IotDeviceMessageService messageService,
|
IotDeviceMessageService messageService,
|
||||||
IotTcpConnectionManager connectionManager,
|
IotTcpConnectionManager connectionManager,
|
||||||
Vertx vertx) {
|
Vertx vertx) {
|
||||||
this.tcpProperties = tcpProperties;
|
this.tcpProperties = tcpProperties;
|
||||||
this.deviceService = deviceService;
|
this.deviceService = deviceService;
|
||||||
this.messageService = messageService;
|
this.messageService = messageService;
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager;
|
|||||||
|
|
||||||
import io.vertx.core.net.NetSocket;
|
import io.vertx.core.net.NetSocket;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.experimental.Accessors;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@@ -62,9 +61,9 @@ public class IotTcpConnectionManager {
|
|||||||
.setDeviceId(deviceId)
|
.setDeviceId(deviceId)
|
||||||
.setAuthInfo(authInfo)
|
.setAuthInfo(authInfo)
|
||||||
.setAuthenticated(true);
|
.setAuthenticated(true);
|
||||||
|
|
||||||
connectionMap.put(socket, connectionInfo);
|
connectionMap.put(socket, connectionInfo);
|
||||||
deviceSocketMap.put(deviceId, socket);
|
deviceSocketMap.put(deviceId, socket);
|
||||||
|
// TODO @haohao:socketDeviceMap 和 connectionMap 会重复哇?connectionMap.get(socket).getDeviceId
|
||||||
socketDeviceMap.put(socket, deviceId);
|
socketDeviceMap.put(socket, deviceId);
|
||||||
|
|
||||||
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
|
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]",
|
||||||
@@ -79,7 +78,6 @@ public class IotTcpConnectionManager {
|
|||||||
public void unregisterConnection(NetSocket socket) {
|
public void unregisterConnection(NetSocket socket) {
|
||||||
ConnectionInfo connectionInfo = connectionMap.remove(socket);
|
ConnectionInfo connectionInfo = connectionMap.remove(socket);
|
||||||
Long deviceId = socketDeviceMap.remove(socket);
|
Long deviceId = socketDeviceMap.remove(socket);
|
||||||
|
|
||||||
if (connectionInfo != null && deviceId != null) {
|
if (connectionInfo != null && deviceId != null) {
|
||||||
deviceSocketMap.remove(deviceId);
|
deviceSocketMap.remove(deviceId);
|
||||||
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
|
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
|
||||||
@@ -87,6 +85,7 @@ public class IotTcpConnectionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @haohao:用不到,要不暂时清理哈。
|
||||||
/**
|
/**
|
||||||
* 注销设备连接(通过设备 ID)
|
* 注销设备连接(通过设备 ID)
|
||||||
*
|
*
|
||||||
@@ -160,26 +159,30 @@ public class IotTcpConnectionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @haohao:ConnectionInfo 和 AuthInfo 是不是可以融合哈?
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接信息
|
* 连接信息
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Accessors(chain = true)
|
|
||||||
public static class ConnectionInfo {
|
public static class ConnectionInfo {
|
||||||
|
|
||||||
private Long deviceId;
|
private Long deviceId;
|
||||||
private AuthInfo authInfo;
|
private AuthInfo authInfo;
|
||||||
private boolean authenticated;
|
private boolean authenticated;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 认证信息
|
* 认证信息
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Accessors(chain = true)
|
|
||||||
public static class AuthInfo {
|
public static class AuthInfo {
|
||||||
|
|
||||||
private Long deviceId;
|
private Long deviceId;
|
||||||
private String productKey;
|
private String productKey;
|
||||||
private String deviceName;
|
private String deviceName;
|
||||||
private String clientId;
|
private String clientId;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -47,7 +47,6 @@ public class IotTcpDownstreamHandler {
|
|||||||
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
|
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
|
||||||
deviceInfo.getDeviceName());
|
deviceInfo.getDeviceName());
|
||||||
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
|
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
||||||
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
|
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
|
|
||||||
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
||||||
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
||||||
|
|
||||||
private static final String AUTH_METHOD = "auth";
|
private static final String AUTH_METHOD = "auth";
|
||||||
|
|
||||||
private final IotDeviceMessageService deviceMessageService;
|
private final IotDeviceMessageService deviceMessageService;
|
||||||
@@ -49,8 +50,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
|
|
||||||
private final String serverId;
|
private final String serverId;
|
||||||
|
|
||||||
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService,
|
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
|
||||||
IotDeviceService deviceService, IotTcpConnectionManager connectionManager) {
|
IotDeviceMessageService deviceMessageService,
|
||||||
|
IotDeviceService deviceService,
|
||||||
|
IotTcpConnectionManager connectionManager) {
|
||||||
this.deviceMessageService = deviceMessageService;
|
this.deviceMessageService = deviceMessageService;
|
||||||
this.deviceService = deviceService;
|
this.deviceService = deviceService;
|
||||||
this.connectionManager = connectionManager;
|
this.connectionManager = connectionManager;
|
||||||
@@ -68,12 +71,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||||
cleanupConnection(socket);
|
cleanupConnection(socket);
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.closeHandler(v -> {
|
socket.closeHandler(v -> {
|
||||||
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||||
cleanupConnection(socket);
|
cleanupConnection(socket);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 设置消息处理器
|
||||||
socket.handler(buffer -> processMessage(clientId, buffer, socket));
|
socket.handler(buffer -> processMessage(clientId, buffer, socket));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,26 +85,24 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
*/
|
*/
|
||||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
|
private void processMessage(String clientId, Buffer buffer, NetSocket socket) {
|
||||||
try {
|
try {
|
||||||
// 1. 数据包基础检查
|
// 1.1 数据包基础检查
|
||||||
if (buffer.length() == 0) {
|
if (buffer.length() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// 1.2 解码消息
|
||||||
// 2. 解码消息
|
|
||||||
MessageInfo messageInfo = decodeMessage(buffer);
|
MessageInfo messageInfo = decodeMessage(buffer);
|
||||||
if (messageInfo == null) {
|
if (messageInfo == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 根据消息类型路由处理
|
// 2. 根据消息类型路由处理
|
||||||
if (isAuthRequest(messageInfo.message)) {
|
if (isAuthRequest(messageInfo.message)) {
|
||||||
// 认证请求:无需检查认证状态
|
// 认证请求
|
||||||
handleAuthenticationRequest(clientId, messageInfo, socket);
|
handleAuthenticationRequest(clientId, messageInfo, socket);
|
||||||
} else {
|
} else {
|
||||||
// 业务消息:需要检查认证状态
|
// 业务消息
|
||||||
handleBusinessRequest(clientId, messageInfo, socket);
|
handleBusinessRequest(clientId, messageInfo, socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
|
log.error("[processMessage][处理消息失败,客户端 ID: {}]", clientId, e);
|
||||||
}
|
}
|
||||||
@@ -112,16 +113,14 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
*/
|
*/
|
||||||
private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
private void handleAuthenticationRequest(String clientId, MessageInfo messageInfo, NetSocket socket) {
|
||||||
try {
|
try {
|
||||||
|
// 1.1 解析认证参数
|
||||||
IotDeviceMessage message = messageInfo.message;
|
IotDeviceMessage message = messageInfo.message;
|
||||||
|
|
||||||
// 1. 解析认证参数
|
|
||||||
AuthParams authParams = parseAuthParams(message.getParams());
|
AuthParams authParams = parseAuthParams(message.getParams());
|
||||||
if (authParams == null) {
|
if (authParams == null) {
|
||||||
sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
|
sendError(socket, message.getRequestId(), "认证参数不完整", messageInfo.codecType);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// 1.2 执行认证
|
||||||
// 2. 执行认证
|
|
||||||
if (!authenticateDevice(authParams)) {
|
if (!authenticateDevice(authParams)) {
|
||||||
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
|
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
|
||||||
clientId, authParams.username);
|
clientId, authParams.username);
|
||||||
@@ -129,14 +128,13 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 解析设备信息
|
// 2.1 解析设备信息
|
||||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
|
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.username);
|
||||||
if (deviceInfo == null) {
|
if (deviceInfo == null) {
|
||||||
sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
|
sendError(socket, message.getRequestId(), "解析设备信息失败", messageInfo.codecType);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// 2.2 获取设备信息
|
||||||
// 4. 获取设备信息
|
|
||||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
|
||||||
deviceInfo.getDeviceName());
|
deviceInfo.getDeviceName());
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
@@ -144,14 +142,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. 注册连接并发送成功响应
|
// 3. 注册连接并发送成功响应
|
||||||
registerConnection(socket, device, deviceInfo, authParams.clientId);
|
registerConnection(socket, device, deviceInfo, authParams.clientId);
|
||||||
sendOnlineMessage(deviceInfo);
|
sendOnlineMessage(deviceInfo);
|
||||||
sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
|
sendSuccess(socket, message.getRequestId(), "认证成功", messageInfo.codecType);
|
||||||
|
|
||||||
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
|
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
|
||||||
device.getId(), deviceInfo.getDeviceName());
|
device.getId(), deviceInfo.getDeviceName());
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
|
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
|
||||||
sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
|
sendError(socket, messageInfo.message.getRequestId(), "认证处理异常", messageInfo.codecType);
|
||||||
@@ -173,25 +169,23 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
// 2. 获取认证信息并处理业务消息
|
// 2. 获取认证信息并处理业务消息
|
||||||
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
IotTcpConnectionManager.AuthInfo authInfo = connectionManager.getAuthInfo(socket);
|
||||||
processBusinessMessage(clientId, messageInfo.message, authInfo);
|
processBusinessMessage(clientId, messageInfo.message, authInfo);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
|
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @haohao:processBusinessMessage 这个小方法,直接融合到 handleBusinessRequest 里?读起来更聚集点
|
||||||
/**
|
/**
|
||||||
* 处理业务消息
|
* 处理业务消息
|
||||||
*/
|
*/
|
||||||
private void processBusinessMessage(String clientId, IotDeviceMessage message,
|
private void processBusinessMessage(String clientId, IotDeviceMessage message,
|
||||||
IotTcpConnectionManager.AuthInfo authInfo) {
|
IotTcpConnectionManager.AuthInfo authInfo) {
|
||||||
try {
|
try {
|
||||||
message.setDeviceId(authInfo.getDeviceId());
|
message.setDeviceId(authInfo.getDeviceId());
|
||||||
message.setServerId(serverId);
|
message.setServerId(serverId);
|
||||||
|
|
||||||
// 发送到消息总线
|
// 发送到消息总线
|
||||||
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
|
deviceMessageService.sendDeviceMessage(message, authInfo.getProductKey(),
|
||||||
authInfo.getDeviceName(), serverId);
|
authInfo.getDeviceName(), serverId);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
|
log.error("[processBusinessMessage][业务消息处理失败,客户端 ID: {},消息 ID: {}]",
|
||||||
clientId, message.getId(), e);
|
clientId, message.getId(), e);
|
||||||
@@ -200,28 +194,27 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 解码消息
|
* 解码消息
|
||||||
|
*
|
||||||
|
* @param buffer 消息
|
||||||
*/
|
*/
|
||||||
private MessageInfo decodeMessage(Buffer buffer) {
|
private MessageInfo decodeMessage(Buffer buffer) {
|
||||||
if (buffer == null || buffer.length() == 0) {
|
if (buffer == null || buffer.length() == 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. 快速检测消息格式类型
|
// 1. 快速检测消息格式类型
|
||||||
|
// TODO @haohao:是不是进一步优化?socket 建立认证后,那条消息已经定义了所有消息的格式哈?
|
||||||
String codecType = detectMessageFormat(buffer);
|
String codecType = detectMessageFormat(buffer);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 2. 使用检测到的格式进行解码
|
// 2. 使用检测到的格式进行解码
|
||||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||||
|
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new MessageInfo(message, codecType);
|
return new MessageInfo(message, codecType);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
|
log.warn("[decodeMessage][消息解码失败,格式: {},数据长度: {},错误: {}]",
|
||||||
codecType, buffer.length(), e.getMessage());
|
codecType, buffer.length(), e.getMessage());
|
||||||
|
// TODO @haohao:一般消息格式不对,应该抛出异常,断开连接居多?
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -231,8 +224,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
* 优化性能:避免不必要的字符串转换
|
* 优化性能:避免不必要的字符串转换
|
||||||
*/
|
*/
|
||||||
private String detectMessageFormat(Buffer buffer) {
|
private String detectMessageFormat(Buffer buffer) {
|
||||||
|
// TODO @haohao:是不是 IotTcpBinaryDeviceMessageCodec 提供一个 isBinaryFormat 方法哈?
|
||||||
|
// 默认使用 JSON
|
||||||
if (buffer.length() == 0) {
|
if (buffer.length() == 0) {
|
||||||
return CODEC_TYPE_JSON; // 默认使用 JSON
|
return CODEC_TYPE_JSON;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. 优先检测二进制格式(检查魔术字节 0x7E)
|
// 1. 优先检测二进制格式(检查魔术字节 0x7E)
|
||||||
@@ -241,6 +236,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 2. 检测 JSON 格式(检查前几个有效字符)
|
// 2. 检测 JSON 格式(检查前几个有效字符)
|
||||||
|
// TODO @haohao:这个检测去掉?直接 return CODEC_TYPE_JSON 更简洁一点。
|
||||||
if (isJsonFormat(buffer)) {
|
if (isJsonFormat(buffer)) {
|
||||||
return CODEC_TYPE_JSON;
|
return CODEC_TYPE_JSON;
|
||||||
}
|
}
|
||||||
@@ -295,14 +291,14 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
* 注册连接信息
|
* 注册连接信息
|
||||||
*/
|
*/
|
||||||
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
||||||
IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
|
IotDeviceAuthUtils.DeviceInfo deviceInfo, String clientId) {
|
||||||
|
// TODO @haohao:AuthInfo 的创建,放在 connectionManager 里构建貌似会更收敛一点?
|
||||||
// 创建认证信息
|
// 创建认证信息
|
||||||
IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
|
IotTcpConnectionManager.AuthInfo authInfo = new IotTcpConnectionManager.AuthInfo()
|
||||||
.setDeviceId(device.getId())
|
.setDeviceId(device.getId())
|
||||||
.setProductKey(deviceInfo.getProductKey())
|
.setProductKey(deviceInfo.getProductKey())
|
||||||
.setDeviceName(deviceInfo.getDeviceName())
|
.setDeviceName(deviceInfo.getDeviceName())
|
||||||
.setClientId(clientId);
|
.setClientId(clientId);
|
||||||
|
|
||||||
// 注册连接
|
// 注册连接
|
||||||
connectionManager.registerConnection(socket, device.getId(), authInfo);
|
connectionManager.registerConnection(socket, device.getId(), authInfo);
|
||||||
}
|
}
|
||||||
@@ -377,15 +373,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
if (params == null) {
|
if (params == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
|
JSONObject paramsJson = params instanceof JSONObject ? (JSONObject) params
|
||||||
: JSONUtil.parseObj(params.toString());
|
: JSONUtil.parseObj(params.toString());
|
||||||
|
|
||||||
String clientId = paramsJson.getStr("clientId");
|
String clientId = paramsJson.getStr("clientId");
|
||||||
String username = paramsJson.getStr("username");
|
String username = paramsJson.getStr("username");
|
||||||
String password = paramsJson.getStr("password");
|
String password = paramsJson.getStr("password");
|
||||||
|
|
||||||
return StrUtil.hasBlank(clientId, username, password) ? null
|
return StrUtil.hasBlank(clientId, username, password) ? null
|
||||||
: new AuthParams(clientId, username, password);
|
: new AuthParams(clientId, username, password);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@@ -410,6 +403,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @haohao:改成 sendErrorResponse sendSuccessResponse 更清晰点?
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 发送错误响应
|
* 发送错误响应
|
||||||
*/
|
*/
|
||||||
@@ -426,15 +421,18 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
|
|
||||||
// ==================== 内部类 ====================
|
// ==================== 内部类 ====================
|
||||||
|
|
||||||
|
// TODO @haohao:IotDeviceAuthReqDTO 复用这个?
|
||||||
/**
|
/**
|
||||||
* 认证参数
|
* 认证参数
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
private static class AuthParams {
|
private static class AuthParams {
|
||||||
|
|
||||||
private final String clientId;
|
private final String clientId;
|
||||||
private final String username;
|
private final String username;
|
||||||
private final String password;
|
private final String password;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -443,7 +441,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
|||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
private static class MessageInfo {
|
private static class MessageInfo {
|
||||||
|
|
||||||
private final IotDeviceMessage message;
|
private final IotDeviceMessage message;
|
||||||
|
|
||||||
private final String codecType;
|
private final String codecType;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user