review:【IoT 物联网】TCP 网络接入的实现

This commit is contained in:
YunaiV
2025-07-19 10:18:29 +08:00
parent bb1210a17a
commit 6a117c9d55
11 changed files with 114 additions and 121 deletions

View File

@@ -54,6 +54,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
static {
// 初始化方法映射
// TODO @haohao有没可能去掉这个 code 到 method 的映射哈?
initializeMethodMappings();
}
@@ -75,6 +76,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
* 负载字段名
*/
public static class PayloadField {
public static final String TIMESTAMP = "timestamp";
public static final String MESSAGE_ID = "msgId";
public static final String DEVICE_ID = "deviceId";
@@ -82,12 +84,14 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String DATA = "data";
public static final String CODE = "code";
public static final String MESSAGE = "message";
}
/**
* 消息方法映射
*/
public static class MessageMethod {
public static final String PROPERTY_POST = "thing.property.post";
public static final String PROPERTY_SET = "thing.property.set";
public static final String PROPERTY_GET = "thing.property.get";
@@ -97,6 +101,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
public static final String OTA_UPGRADE = "thing.ota.upgrade";
public static final String STATE_ONLINE = "thing.state.online";
public static final String STATE_OFFLINE = "thing.state.offline";
}
// ==================== 初始化方法 ====================
@@ -139,9 +144,9 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
// 3. 构建 TCP 数据包
TcpDataPackage dataPackage = TcpDataPackage.builder()
.addr("") // 地址在发送时由调用方设置
.addr("")
.code(code)
.mid((short) 0) // 消息序号在发送时由调用方设置
.mid((short) 0)
.payload(payload)
.build();
@@ -154,9 +159,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
log.debug("[encode][TCP 消息编码成功] 方法: {}, 数据长度: {}",
message.getMethod(), result.length);
}
return result;
} catch (Exception e) {
log.error("[encode][TCP 消息编码失败] 消息: {}", message, e);
throw new TcpCodecException("TCP 消息编码失败", e);
@@ -175,13 +178,10 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
// 1. 解码 TCP 数据包
Buffer buffer = Buffer.buffer(bytes);
TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer);
// 2. 获取消息方法
String method = getMethodByCodeSafely(dataPackage.getCode());
// 3. 解析负载数据
Object params = parsePayloadOptimized(dataPackage.getPayload());
// 4. 构建 IoT 设备消息
IotDeviceMessage message = IotDeviceMessage.requestOf(method, params);
@@ -190,9 +190,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
log.debug("[decode][TCP 消息解码成功] 方法: {}, 功能码: {}",
method, dataPackage.getCode());
}
return message;
} catch (Exception e) {
log.error("[decode][TCP 消息解码失败] 数据长度: {}, 数据内容: {}",
bytes.length, truncateData(bytes, 100), e);
@@ -226,8 +224,8 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
if (Objects.isNull(bytes) || bytes.length == 0) {
throw new IllegalArgumentException("待解码数据不能为空");
}
if (bytes.length > 1024 * 1024) { // 1MB 限制
throw new IllegalArgumentException("数据包过大超过1MB限制");
if (bytes.length > 1024 * 1024) {
throw new IllegalArgumentException("数据包过大,超过 1MB 限制");
}
}
@@ -236,9 +234,10 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private short getCodeByMethodSafely(String method) {
Short code = METHOD_TO_CODE_MAP.get(method);
// 默认为数据上报
if (code == null) {
log.warn("[getCodeByMethodSafely][未知的消息方法: {},使用默认功能码]", method);
return TcpDataPackage.CODE_DATA_UP; // 默认为数据上报
return TcpDataPackage.CODE_DATA_UP;
}
return code;
}
@@ -260,6 +259,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
*/
private String buildPayloadOptimized(IotDeviceMessage message) {
// 使用缓存键
// TODO @haohao是不是不用缓存哈
String cacheKey = message.getMethod() + "_" + message.getRequestId();
JSONObject cachedPayload = jsonCache.get(cacheKey);
@@ -271,7 +271,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
// 创建新的负载
JSONObject payload = new JSONObject();
// 添加基础字段
addToPayloadIfNotNull(payload, PayloadField.MESSAGE_ID, message.getRequestId());
addToPayloadIfNotNull(payload, PayloadField.DEVICE_ID, message.getDeviceId());
@@ -279,7 +278,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
addToPayloadIfNotNull(payload, PayloadField.DATA, message.getData());
addToPayloadIfNotNull(payload, PayloadField.CODE, message.getCode());
addToPayloadIfNotEmpty(payload, PayloadField.MESSAGE, message.getMsg());
// 添加时间戳
payload.set(PayloadField.TIMESTAMP, System.currentTimeMillis());
@@ -317,7 +315,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
}
return jsonObject.containsKey(PayloadField.PARAMS) ? jsonObject.get(PayloadField.PARAMS) : jsonObject;
} catch (JSONException e) {
log.warn("[parsePayloadOptimized][负载解析为JSON失败返回原始字符串] 负载: {}", payload);
return payload;
@@ -379,6 +376,7 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
* TCP 编解码异常
*/
public static class TcpCodecException extends RuntimeException {
public TcpCodecException(String message) {
super(message);
}
@@ -386,5 +384,6 @@ public class IotTcpDeviceMessageCodec implements IotDeviceMessageCodec {
public TcpCodecException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@@ -108,16 +108,14 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
processedMessages.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[onMessage][收到下行消息] 设备ID: {}, 方法: {}, 消息ID: {}",
log.debug("[onMessage][收到下行消息] 设备 ID: {}, 方法: {}, 消息ID: {}",
message.getDeviceId(), message.getMethod(), message.getId());
}
// 参数校验
if (message.getDeviceId() == null) {
log.warn("[onMessage][下行消息设备ID为空跳过处理] 消息: {}", message);
log.warn("[onMessage][下行消息设备 ID 为空,跳过处理] 消息: {}", message);
return;
}
// 检查连接状态
if (connectionManager.getClientByDeviceId(message.getDeviceId()) == null) {
log.warn("[onMessage][设备({})离线,跳过下行消息] 方法: {}",
@@ -130,11 +128,11 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
// 性能监控
long processTime = System.currentTimeMillis() - startTime;
if (processTime > 1000) { // 超过1秒的慢消息
// TODO @haohao1000 搞成静态变量;
if (processTime > 1000) { // 超过 1 秒的慢消息
log.warn("[onMessage][慢消息处理] 设备ID: {}, 方法: {}, 耗时: {}ms",
message.getDeviceId(), message.getMethod(), processTime);
}
} catch (Exception e) {
failedMessages.incrementAndGet();
log.error("[onMessage][处理下行消息失败] 设备ID: {}, 方法: {}, 消息: {}",
@@ -142,6 +140,8 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
}
}
// TODO @haohao多余的要不先清理掉
/**
* 获取订阅者统计信息
*/
@@ -160,4 +160,5 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
public boolean isHealthy() {
return initialized.get() && downstreamHandler != null;
}
}

View File

@@ -92,6 +92,7 @@ public class IotTcpUpstreamProtocol {
* 启动 TCP 服务器
*/
private void startTcpServer() {
// TODO @haohao同类的最好使用相同序号前缀一个方法看起来有段落感。包括同类可以去掉之间的空格。例如说这里的1. 2. 3. 4. 是初始化5. 6. 是管理启动
// 1. 创建服务器选项
NetServerOptions options = new NetServerOptions()
.setPort(tcpProperties.getPort())

View File

@@ -10,9 +10,8 @@ import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* TCP 设备客户端
* TCP 设备客户端:封装设备连接的基本信息和操作
* <p>
* 封装设备连接的基本信息和操作。
* 该类中的状态变更(如 authenticated, closed使用 AtomicBoolean 保证原子性。
* 对 socket 的操作应在 Vert.x Event Loop 线程中执行,以避免并发问题。
*
@@ -48,7 +47,7 @@ public class TcpDeviceClient {
private RecordParser parser;
@Getter
private final long keepAliveTimeoutMs; // 改为 final通过构造函数注入
private final long keepAliveTimeoutMs;
private volatile long lastKeepAliveTime;
@@ -58,7 +57,7 @@ public class TcpDeviceClient {
/**
* 构造函数
*
* @param clientId 客户端ID全局唯一
* @param clientId 客户端 ID全局唯一
* @param keepAliveTimeoutMs 心跳超时时间(毫秒),从配置中读取
*/
public TcpDeviceClient(String clientId, long keepAliveTimeoutMs) {
@@ -69,19 +68,19 @@ public class TcpDeviceClient {
/**
* 绑定网络套接字,并设置相关处理器。
* 此方法应在 Vert.x Event Loop 线程中调用
* 此方法应在 Vert.x Event Loop 线程中调用
*
* @param socket 网络套接字
*/
public void setSocket(NetSocket socket) {
// 无需 synchronizedVert.x 保证了同一个 socket 的事件在同一个 Event Loop 中处理
if (this.socket != null && this.socket != socket) {
log.warn("[setSocket][客户端({})] 正在用新的 socket 替换旧的,旧 socket 将被关闭", clientId);
log.warn("[setSocket][客户端({}) 正在用新的 socket 替换旧的,旧 socket 将被关闭]", clientId);
this.socket.close();
}
this.socket = socket;
// 注册处理器
if (socket != null) {
// 1. 设置关闭处理器
socket.closeHandler(v -> {
@@ -103,22 +102,22 @@ public class TcpDeviceClient {
if (parser != null) {
parser.handle(buffer);
} else {
log.warn("[setSocket][设备客户端({})] 未设置解析器(parser),原始数据被忽略: {}", clientId, buffer.toString());
log.warn("[setSocket][设备客户端({}) 未设置解析器(parser),原始数据被忽略: {}]", clientId, buffer.toString());
}
});
}
}
/**
* 更新心跳时间,表示设备仍然活跃
* 更新心跳时间,表示设备仍然活跃
*/
public void keepAlive() {
this.lastKeepAliveTime = System.currentTimeMillis();
}
/**
* 检查连接是否在线
* 判断标准未被主动关闭、socket 存在、且在心跳超时时间内
* 检查连接是否在线
* 判断标准未被主动关闭、socket 存在、且在心跳超时时间内
*
* @return 是否在线
*/
@@ -130,6 +129,8 @@ public class TcpDeviceClient {
return idleTime < keepAliveTimeoutMs;
}
// TODO @haohao1是不是简化下productKey 和 deviceName 非空就认为是已认证2如果是的话productKey 和 deviceName 搞成一个设置方法setAuthenticatedproductKey、deviceName
public boolean isAuthenticated() {
return authenticated.get();
}
@@ -139,7 +140,7 @@ public class TcpDeviceClient {
}
/**
* 向设备发送消息
* 向设备发送消息
*
* @param buffer 消息内容
*/
@@ -151,18 +152,22 @@ public class TcpDeviceClient {
// Vert.x 的 write 是异步的,不会阻塞
socket.write(buffer, result -> {
if (result.succeeded()) {
log.debug("[sendMessage][设备客户端({})发送消息成功]", clientId);
// 发送成功也更新心跳,表示连接活跃
keepAlive();
} else {
// 发送失败可能意味着连接已断开,主动关闭
if (!result.succeeded()) {
log.error("[sendMessage][设备客户端({})发送消息失败]", clientId, result.cause());
// 发送失败可能意味着连接已断开,主动关闭
shutdown();
return;
}
// 发送成功也更新心跳,表示连接活跃
if (log.isDebugEnabled()) {
log.debug("[sendMessage][设备客户端({})发送消息成功]", clientId);
}
keepAlive();
});
}
// TODO @haohao是不是叫 close 好点?或者问问大模型
/**
* 关闭客户端连接并清理资源。
* 这是一个幂等操作,可以被多次安全调用。
@@ -200,10 +205,6 @@ public class TcpDeviceClient {
return "disconnected";
}
public long getLastKeepAliveTime() {
return lastKeepAliveTime;
}
@Override
public String toString() {
return "TcpDeviceClient{" +
@@ -215,4 +216,5 @@ public class TcpDeviceClient {
", connection=" + getConnectionInfo() +
'}';
}
}

View File

@@ -16,8 +16,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* 参考 EMQX 设计理念:
* 1. 高性能连接管理
* 2. 连接池和资源管理
* 3. 流量控制
* 4. 监控统计
* 3. 流量控制 TODO @haohao这个要不先去掉
* 4. 监控统计 TODO @haohao这个要不先去掉
* 5. 自动清理和容错
*
* @author 芋道源码
@@ -106,6 +106,7 @@ public class TcpDeviceConnectionManager {
* 添加设备客户端
*/
public boolean addClient(String deviceAddr, TcpDeviceClient client) {
// TODO @haohao这个要不去掉目前看着没做 result 的处理;
if (clientMap.size() >= MAX_CONNECTIONS) {
log.warn("[addClient][连接数已达上限({}),拒绝新连接: {}]", MAX_CONNECTIONS, deviceAddr);
return false;
@@ -130,14 +131,13 @@ public class TcpDeviceConnectionManager {
socketToAddrMap.put(client.getSocket(), deviceAddr);
}
// 如果客户端已设置设备ID更新映射
// 如果客户端已设置设备 ID更新映射
if (client.getDeviceId() != null) {
deviceIdToAddrMap.put(client.getDeviceId(), deviceAddr);
}
totalConnections.incrementAndGet();
return true;
} finally {
writeLock.unlock();
}
@@ -196,7 +196,7 @@ public class TcpDeviceConnectionManager {
}
/**
* 通过设备ID获取客户端
* 通过设备 ID 获取客户端
*/
public TcpDeviceClient getClientByDeviceId(Long deviceId) {
readLock.lock();
@@ -208,6 +208,8 @@ public class TcpDeviceConnectionManager {
}
}
// TODO @haohaogetClientBySocket、isDeviceOnline、sendMessage、sendMessageByDeviceId、broadcastMessage 用不到的方法,要不先暂时不提供?保持简洁、更容易理解哈。
/**
* 通过网络连接获取客户端
*/
@@ -230,7 +232,7 @@ public class TcpDeviceConnectionManager {
}
/**
* 设置设备ID映射
* 设置设备 ID 映射
*/
public void setDeviceIdMapping(String deviceAddr, Long deviceId) {
writeLock.lock();
@@ -349,12 +351,12 @@ public class TcpDeviceConnectionManager {
}
}
// TODO @haohao心跳超时需要 close 么?
/**
* 心跳检查任务
*/
private void checkHeartbeat() {
try {
long currentTime = System.currentTimeMillis();
int offlineCount = 0;
readLock.lock();
@@ -369,7 +371,7 @@ public class TcpDeviceConnectionManager {
}
if (offlineCount > 0) {
log.info("[checkHeartbeat][发现{}个离线设备,将在清理任务中处理]", offlineCount);
log.info("[checkHeartbeat][发现 {} 个离线设备,将在清理任务中处理]", offlineCount);
}
} catch (Exception e) {
log.error("[checkHeartbeat][心跳检查任务异常]", e);
@@ -424,14 +426,14 @@ public class TcpDeviceConnectionManager {
private void logStatistics() {
try {
long totalConn = totalConnections.get();
long totalDisconn = totalDisconnections.get();
long totalDisconnections = this.totalDisconnections.get();
long totalMsg = totalMessages.get();
long totalFailedMsg = totalFailedMessages.get();
long totalBytesValue = totalBytes.get();
log.info("[logStatistics][连接统计] 总连接: {}, 总断开: {}, 当前在线: {}, 认证设备: {}, " +
"总消息: {}, 失败消息: {}, 总字节: {}",
totalConn, totalDisconn, getOnlineCount(), getAuthenticatedCount(),
totalConn, totalDisconnections, getOnlineCount(), getAuthenticatedCount(),
totalMsg, totalFailedMsg, totalBytesValue);
} catch (Exception e) {
log.error("[logStatistics][统计日志任务异常]", e);
@@ -500,4 +502,5 @@ public class TcpDeviceConnectionManager {
? (double) (totalMessages.get() - totalFailedMessages.get()) / totalMessages.get() * 100
: 0.0);
}
}

View File

@@ -3,13 +3,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.protocol;
import io.vertx.core.buffer.Buffer;
import lombok.extern.slf4j.Slf4j;
// TODO @haohao“设备地址长度”是不是不需要。
/**
* TCP 数据解码器
* <p>
* 负责将字节流解码为 TcpDataPackage 对象
* <p>
* 数据包格式:
* 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长)
* 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长)
*
* @author 芋道源码
*/
@@ -31,35 +32,35 @@ public class TcpDataDecoder {
try {
int index = 0;
// 1. 获取设备地址长度2字节
// 1.1 获取设备地址长度2字节
short addrLength = buffer.getShort(index);
index += 2;
// 2. 校验数据包长度
// 1.2 校验数据包长度
int expectedLength = 2 + addrLength + 2 + 2; // 地址长度 + 地址 + 功能码 + 消息序号
if (buffer.length() < expectedLength) {
throw new IllegalArgumentException("数据包长度不足,期望至少 " + expectedLength + " 字节");
}
// 3. 获取设备地址
// 1.3 获取设备地址
String addr = buffer.getBuffer(index, index + addrLength).toString();
index += addrLength;
// 4. 获取功能码2字节
// 1.4 获取功能码2字节
short code = buffer.getShort(index);
index += 2;
// 5. 获取消息序号2字节
// 1.5 获取消息序号2字节
short mid = buffer.getShort(index);
index += 2;
// 6. 获取包体数据
// 1.6 获取包体数据
String payload = "";
if (index < buffer.length()) {
payload = buffer.getString(index, buffer.length());
}
// 7. 构建数据包对象
// 2. 构建数据包对象
TcpDataPackage dataPackage = TcpDataPackage.builder()
.addrLength((int) addrLength)
.addr(addr)
@@ -70,15 +71,14 @@ public class TcpDataDecoder {
log.debug("[decode][解码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 包体长度: {}",
addr, dataPackage.getCodeDescription(), mid, payload.length());
return dataPackage;
} catch (Exception e) {
log.error("[decode][解码失败] 数据: {}", buffer.toString(), e);
throw new IllegalArgumentException("数据包解码失败: " + e.getMessage(), e);
}
}
// TODO @haohao这个要不去掉暂时没用到
/**
* 校验数据包格式
*
@@ -94,4 +94,5 @@ public class TcpDataDecoder {
return false;
}
}
}

View File

@@ -27,11 +27,9 @@ public class TcpDataEncoder {
if (dataPackage == null) {
throw new IllegalArgumentException("数据包对象不能为空");
}
if (dataPackage.getAddr() == null || dataPackage.getAddr().isEmpty()) {
throw new IllegalArgumentException("设备地址不能为空");
}
if (dataPackage.getPayload() == null) {
dataPackage.setPayload("");
}
@@ -39,34 +37,27 @@ public class TcpDataEncoder {
try {
Buffer buffer = Buffer.buffer();
// 1. 计算包体长度(除了包头4字节)
// 1. 计算包体长度(除了包头 4 字节)
int payloadLength = dataPackage.getPayload().getBytes().length;
int totalLength = 2 + dataPackage.getAddr().length() + 2 + 2 + payloadLength;
// 2. 写入包头总长度4字节
// 2.1 写入包头总长度4 字节)
buffer.appendInt(totalLength);
// 3. 写入设备地址长度2字节
// 2.2 写入设备地址长度2 字节)
buffer.appendShort((short) dataPackage.getAddr().length());
// 4. 写入设备地址(不定长)
// 2.3 写入设备地址(不定长)
buffer.appendBytes(dataPackage.getAddr().getBytes());
// 5. 写入功能码2字节
// 2.4 写入功能码2 字节)
buffer.appendShort(dataPackage.getCode());
// 6. 写入消息序号2字节
// 2.5 写入消息序号2 字节)
buffer.appendShort(dataPackage.getMid());
// 7. 写入包体数据(不定长)
// 2.6 写入包体数据(不定长)
buffer.appendBytes(dataPackage.getPayload().getBytes());
log.debug("[encode][编码成功] 设备地址: {}, 功能码: {}, 消息序号: {}, 总长度: {}",
dataPackage.getAddr(), dataPackage.getCodeDescription(),
dataPackage.getMid(), buffer.length());
return buffer;
} catch (Exception e) {
log.error("[encode][编码失败] 数据包: {}", dataPackage, e);
throw new IllegalArgumentException("数据包编码失败: " + e.getMessage(), e);
@@ -82,15 +73,14 @@ public class TcpDataEncoder {
* @return 编码后的数据包
*/
public static Buffer createRegisterReply(String addr, short mid, boolean success) {
String payload = success ? "0" : "1"; // 0表示成功1表示失败
// TODO @haohaopayload 默认成功、失败,最好讴有个枚举
String payload = success ? "0" : "1"; // 0 表示成功1 表示失败
TcpDataPackage dataPackage = TcpDataPackage.builder()
.addr(addr)
.code(TcpDataPackage.CODE_REGISTER_REPLY)
.mid(mid)
.payload(payload)
.build();
return encode(dataPackage);
}
@@ -109,7 +99,6 @@ public class TcpDataEncoder {
.mid(mid)
.payload(data)
.build();
return encode(dataPackage);
}
@@ -128,7 +117,6 @@ public class TcpDataEncoder {
.mid(mid)
.payload(serviceData)
.build();
return encode(dataPackage);
}
@@ -147,7 +135,6 @@ public class TcpDataEncoder {
.mid(mid)
.payload(propertyData)
.build();
return encode(dataPackage);
}
@@ -166,7 +153,7 @@ public class TcpDataEncoder {
.mid(mid)
.payload(propertyNames)
.build();
return encode(dataPackage);
}
}

View File

@@ -9,7 +9,7 @@ import lombok.NoArgsConstructor;
* TCP 数据包协议定义
* <p>
* 数据包格式:
* 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长)
* 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长)
*
* @author 芋道源码
*/
@@ -29,10 +29,12 @@ public class TcpDataPackage {
* 注册回复
*/
public static final short CODE_REGISTER_REPLY = 11;
// TODO @haohao【重要】一般心跳服务端会回复一条回复要搞独立的 code 码,还是继续用原来的,因为 requestId 可以映射;
/**
* 心跳
*/
public static final short CODE_HEARTBEAT = 20;
// TODO @haohao【重要】下面的是不是融合成消息上行client -> server消息下行server -> client然后把 method 放到 body 里?
/**
* 数据上报
*/
@@ -60,6 +62,8 @@ public class TcpDataPackage {
// ==================== 数据包字段 ====================
// TODO @haohao设备 addrLength、addr 是不是非必要呀?
/**
* 设备地址长度
*/
@@ -87,6 +91,8 @@ public class TcpDataPackage {
// ==================== 辅助方法 ====================
// TODO @haohao用不到的方法可以清理掉哈
/**
* 是否为注册消息
*/
@@ -123,6 +129,7 @@ public class TcpDataPackage {
code == CODE_PROPERTY_SET || code == CODE_PROPERTY_GET;
}
// TODO @haohao这个是不是去掉呀多了一些维护成本
/**
* 获取功能码描述
*/

View File

@@ -13,7 +13,7 @@ import java.util.function.Consumer;
* 负责从 TCP 流中读取完整的数据包
* <p>
* 数据包格式:
* 包头(4字节长度) | 设备地址长度(2字节) | 设备地址(不定长) | 功能码(2字节) | 消息序号(2字节) | 包体(不定长)
* 包头(4 字节长度) | 设备地址长度(2 字节) | 设备地址(不定长) | 功能码(2 字节) | 消息序号(2 字节) | 包体(不定长)
*
* @author 芋道源码
*/
@@ -27,12 +27,12 @@ public class TcpDataReader {
* @return RecordParser 解析器
*/
public static RecordParser createParser(Consumer<Buffer> receiveHandler) {
// 首先读取4字节的长度信息
// 首先读取 4 字节的长度信息
RecordParser parser = RecordParser.newFixed(4);
// 设置处理器
parser.setOutput(new Handler<Buffer>() {
// 当前数据包的长度,-1表示还没有读取到长度信息
// 当前数据包的长度,-1 表示还没有读取到长度信息
private int dataLength = -1;
@Override
@@ -43,8 +43,9 @@ public class TcpDataReader {
// 从包头中读取数据长度
dataLength = buffer.getInt(0);
// 校验数据长度
if (dataLength <= 0 || dataLength > 1024 * 1024) { // 最大1MB
// 校验数据长度(最大 1 MB
// TODO @haohao1m 蛮多地方在写死,最好配置管理下。或者有个全局的枚举;
if (dataLength <= 0 || dataLength > 1024 * 1024) {
log.error("[handle][无效的数据包长度: {}]", dataLength);
reset();
return;
@@ -86,6 +87,8 @@ public class TcpDataReader {
return parser;
}
// TODO @haohao用不到的方法可以清理掉哈
/**
* 创建带异常处理的数据包解析器
*

View File

@@ -55,6 +55,7 @@ public class IotTcpDownstreamHandler {
}
// 2. 根据消息方法处理不同类型的下行消息
// TODO @芋艿、@haohao看看有没什么办法减少这样的编码。拓展新消息类型成本高
switch (message.getMethod()) {
case "thing.property.set":
handlePropertySet(client, message);
@@ -75,8 +76,8 @@ public class IotTcpDownstreamHandler {
log.warn("[handle][未知的下行消息方法: {}]", message.getMethod());
break;
}
} catch (Exception e) {
// TODO @haohao最好消息的内容打印下
log.error("[handle][处理下行消息失败]", e);
}
}
@@ -104,7 +105,6 @@ public class IotTcpDownstreamHandler {
log.debug("[handlePropertySet][属性设置消息已发送(降级)] 设备地址: {}, 消息序号: {}",
client.getDeviceAddr(), mid);
});
} catch (Exception e) {
log.error("[handlePropertySet][属性设置失败]", e);
}
@@ -133,7 +133,6 @@ public class IotTcpDownstreamHandler {
log.debug("[handlePropertyGet][属性获取消息已发送(降级)] 设备地址: {}, 消息序号: {}",
client.getDeviceAddr(), mid);
});
} catch (Exception e) {
log.error("[handlePropertyGet][属性获取失败]", e);
}
@@ -162,7 +161,6 @@ public class IotTcpDownstreamHandler {
log.debug("[handleServiceInvoke][服务调用消息已发送] 设备地址: {}, 消息序号: {}",
client.getDeviceAddr(), mid);
} catch (Exception e) {
log.error("[handleServiceInvoke][服务调用失败]", e);
}
@@ -191,7 +189,6 @@ public class IotTcpDownstreamHandler {
log.debug("[handleConfigPush][配置推送消息已发送] 设备地址: {}, 消息序号: {}",
client.getDeviceAddr(), mid);
} catch (Exception e) {
log.error("[handleConfigPush][配置推送失败]", e);
}
@@ -262,6 +259,7 @@ public class IotTcpDownstreamHandler {
}
}
// TODO @haohao用不到的要不暂时不提供
/**
* 批量发送下行消息
*
@@ -287,7 +285,6 @@ public class IotTcpDownstreamHandler {
// 处理单个设备消息
handle(copyMessage);
}
} catch (Exception e) {
log.error("[broadcastMessage][批量发送消息失败]", e);
}
@@ -341,7 +338,6 @@ public class IotTcpDownstreamHandler {
log.debug("[{}][消息已发送] 设备地址: {}, 消息序号: {}",
methodName, client.getDeviceAddr(), dataPackage.getMid());
} catch (Exception e) {
log.warn("[{}][使用编解码器编码失败,降级使用原始编码] 错误: {}",
methodName, e.getMessage());
@@ -353,6 +349,7 @@ public class IotTcpDownstreamHandler {
}
}
// TODO @haohao看看这个要不要删除掉
/**
* 获取连接统计信息
*

View File

@@ -39,6 +39,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private final IotGatewayProperties.TcpProperties tcpConfig;
// TODO @haohao可以把 TcpDeviceConnectionManager 能力放大一点1handle 里的 client 初始化,可以拿到 TcpDeviceConnectionManager 里2handleDeviceRegister 也是;
private final TcpDeviceConnectionManager connectionManager;
private final IotDeviceService deviceService;
@@ -53,26 +54,25 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
public void handle(NetSocket socket) {
log.info("[handle][收到设备连接: {}]", socket.remoteAddress());
// 创建客户端ID和设备客户端
// 创建客户端 ID 和设备客户端
// TODO @haohaoclientid 给 TcpDeviceClient 生成会简洁一点;减少 upsteramhanlder 的非核心逻辑;
String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress();
TcpDeviceClient client = new TcpDeviceClient(clientId, tcpConfig.getKeepAliveTimeoutMs());
try {
// 设置连接异常和关闭处理
socket.exceptionHandler(ex -> {
// TODO @haohao这里的日志可能把 clientid 都打上?因为 address 会重复么?
log.error("[handle][连接({})异常]", socket.remoteAddress(), ex);
handleConnectionClose(client);
});
socket.closeHandler(v -> {
log.info("[handle][连接({})关闭]", socket.remoteAddress());
handleConnectionClose(client);
});
// 设置网络连接
client.setSocket(socket);
// 创建数据解析器
// 设置解析器
RecordParser parser = TcpDataReader.createParser(buffer -> {
try {
handleDataPackage(client, buffer);
@@ -80,13 +80,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
log.error("[handle][处理数据包异常]", e);
}
});
// 设置解析器
client.setParser(parser);
// TODO @haohaosocket.remoteAddress()) 打印进去
log.info("[handle][设备连接处理器初始化完成: {}]", clientId);
} catch (Exception e) {
// TODO @haohaosocket.remoteAddress()) 打印进去
log.error("[handle][初始化连接处理器失败]", e);
client.shutdown();
}
@@ -102,12 +101,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
try {
// 解码数据包
TcpDataPackage dataPackage = TcpDataDecoder.decode(buffer);
log.info("[handleDataPackage][接收数据包] 设备地址: {}, 功能码: {}, 消息序号: {}",
dataPackage.getAddr(), dataPackage.getCodeDescription(), dataPackage.getMid());
// 根据功能码处理不同类型的消息
switch (dataPackage.getCode()) {
// TODO @haohao【重要】code 要不要改成 opCode。这样和 data 里的 code 好区分;
case TcpDataPackage.CODE_REGISTER:
handleDeviceRegister(client, dataPackage);
break;
@@ -124,8 +123,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
log.warn("[handleDataPackage][未知功能码: {}]", dataPackage.getCode());
break;
}
} catch (Exception e) {
// TODO @haohao最好有 client 标识;
log.error("[handleDataPackage][处理数据包失败]", e);
}
}
@@ -140,7 +139,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
try {
String deviceAddr = dataPackage.getAddr();
String productKey = dataPackage.getPayload();
log.info("[handleDeviceRegister][设备注册] 设备地址: {}, 产品密钥: {}", deviceAddr, productKey);
// 获取设备信息
@@ -152,6 +150,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
// 更新客户端信息
// TODO @haohao一个 set 方法,统一处理掉会好点哈;
client.setProductKey(productKey);
client.setDeviceName(deviceAddr);
client.setDeviceId(device.getId());
@@ -169,7 +168,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
sendRegisterReply(client, dataPackage, true);
log.info("[handleDeviceRegister][设备注册成功] 设备地址: {}, 设备ID: {}", deviceAddr, device.getId());
} catch (Exception e) {
log.error("[handleDeviceRegister][设备注册失败]", e);
sendRegisterReply(client, dataPackage, false);
@@ -185,7 +183,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private void handleHeartbeat(TcpDeviceClient client, TcpDataPackage dataPackage) {
try {
String deviceAddr = dataPackage.getAddr();
log.debug("[handleHeartbeat][收到心跳] 设备地址: {}", deviceAddr);
// 更新心跳时间
@@ -230,7 +227,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 3. 发送解码后的消息
messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId);
} catch (Exception e) {
log.warn("[handleDataUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage());
@@ -242,7 +238,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 发送数据上报回复
sendDataUpReply(client, dataPackage);
} catch (Exception e) {
log.error("[handleDataUp][处理数据上报失败]", e);
}
@@ -279,11 +274,11 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 3. 发送解码后的消息
messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId);
} catch (Exception e) {
log.warn("[handleEventUp][使用编解码器解码失败,降级使用原始解析] 错误: {}", e.getMessage());
// 降级处理:使用原始方式解析数据
// TODO @芋艿:降级处理逻辑;
JSONObject eventJson = JSONUtil.parseObj(payload);
IotDeviceMessage message = IotDeviceMessage.requestOf("thing.event.post", eventJson);
messageService.sendDeviceMessage(message, client.getProductKey(), client.getDeviceName(), serverId);
@@ -291,7 +286,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 发送事件上报回复
sendEventUpReply(client, dataPackage);
} catch (Exception e) {
log.error("[handleEventUp][处理事件上报失败]", e);
}
@@ -329,13 +323,13 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
.addr(dataPackage.getAddr())
.code(TcpDataPackage.CODE_DATA_UP)
.mid(dataPackage.getMid())
.payload("0") // 0表示成功
.payload("0") // 0 表示成功 TODO @haohao最好枚举到 TcpDataPackage 里?
.build();
io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage);
client.sendMessage(replyBuffer);
} catch (Exception e) {
// TODO @haohao可以有个 client id
log.error("[sendDataUpReply][发送数据上报回复失败]", e);
}
}
@@ -352,12 +346,11 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
.addr(dataPackage.getAddr())
.code(TcpDataPackage.CODE_EVENT_UP)
.mid(dataPackage.getMid())
.payload("0") // 0表示成功
.payload("0") // 0 表示成功
.build();
io.vertx.core.buffer.Buffer replyBuffer = TcpDataEncoder.encode(replyPackage);
client.sendMessage(replyBuffer);
} catch (Exception e) {
log.error("[sendEventUpReply][发送事件上报回复失败]", e);
}
@@ -385,7 +378,6 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
log.info("[handleConnectionClose][处理连接关闭完成] 设备地址: {}", deviceAddr);
} catch (Exception e) {
log.error("[handleConnectionClose][处理连接关闭失败]", e);
}