review:【IoT 物联网】网关 TCP 的接入代码

This commit is contained in:
YunaiV
2025-06-30 00:04:58 +08:00
parent a5a3aea522
commit bf41d47fa8
7 changed files with 21 additions and 10 deletions

View File

@@ -85,6 +85,7 @@ public class IotGatewayConfiguration {
@Slf4j
public static class TcpProtocolConfiguration {
// TODO @haohaoclose
@Bean
public Vertx tcpVertx() {
return Vertx.vertx();
@@ -92,16 +93,17 @@ public class IotGatewayConfiguration {
@Bean
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(Vertx tcpVertx, IotGatewayProperties gatewayProperties,
IotTcpConnectionManager connectionManager, IotDeviceMessageService messageService,
IotDeviceService deviceService, IotDeviceCommonApi deviceApi) {
IotTcpConnectionManager connectionManager,
IotDeviceMessageService messageService,
IotDeviceService deviceService, IotDeviceCommonApi deviceApi) {
return new IotTcpUpstreamProtocol(tcpVertx, gatewayProperties, connectionManager,
messageService, deviceService, deviceApi);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol tcpUpstreamProtocol,
IotMessageBus messageBus,
IotTcpDownstreamHandler downstreamHandler) {
IotMessageBus messageBus,
IotTcpDownstreamHandler downstreamHandler) {
return new IotTcpDownstreamSubscriber(tcpUpstreamProtocol, messageBus, downstreamHandler);
}

View File

@@ -108,10 +108,12 @@ public class IotGatewayProperties {
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
// TODO @haohao加个默认值
/**
* 服务端口
*/
private Integer serverPort;
// TODO @haohao应该不用一般都监听 0.0.0.0 哈;
/**
* 服务主机
*/

View File

@@ -79,7 +79,7 @@ public class IotEmqxDownstreamHandler {
// 2. 根据消息方法和回复状态,构建 topic
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @芋艿:看看基于 message 的 method 去反向推导;
// TODO @haohao:看看基于 message 的 method 去反向推导;
// 3. 根据消息方法类型构建对应的主题
switch (methodEnum) {
case PROPERTY_POST:
@@ -88,21 +88,18 @@ public class IotEmqxDownstreamHandler {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
break;
case PROPERTY_SET:
// 属性设置:只支持非回复消息(下行)
if (!isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
}
break;
case EVENT_POST:
// 事件上报:只支持回复消息(下行)
if (isReply) {
return IotMqttTopicUtils.buildEventPostReplyTopicGeneric(productKey, deviceName);
}
break;
case SERVICE_INVOKE:
// 服务调用:支持请求和回复
if (isReply) {
@@ -110,14 +107,12 @@ public class IotEmqxDownstreamHandler {
} else {
return IotMqttTopicUtils.buildServiceTopicGeneric(productKey, deviceName);
}
case CONFIG_PUSH:
// 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复)
if (!isReply) {
return IotMqttTopicUtils.buildConfigPushTopic(productKey, deviceName);
}
break;
default:
log.warn("[buildTopicByMethod][未处理的消息方法: {}]", methodEnum);
break;

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentMap;
@Slf4j
public class IotTcpConnectionManager {
// TODO @haohao要考虑相同设备多次连接的情况哇
/**
* 连接集合
*
@@ -50,6 +51,7 @@ public class IotTcpConnectionManager {
* @param socket Netty Channel
*/
public void removeConnection(NetSocket socket) {
// TODO @haohaovertx 的 socket有没办法设置一些属性类似 netty 的;目的是,避免遍历 connectionMap 去操作哈;
connectionMap.entrySet().stream()
.filter(entry -> entry.getValue().equals(socket))
.findFirst()

View File

@@ -64,6 +64,7 @@ public class IotTcpConnectionHandler implements Handler<Buffer> {
public void handle(Buffer buffer) {
log.info("[handle][接收到数据: {}]", buffer);
try {
// TODO @haohao可以调研下做个对比表格哈
// 1. 处理认证
if (!authenticated) {
handleAuthentication(buffer);
@@ -80,6 +81,11 @@ public class IotTcpConnectionHandler implements Handler<Buffer> {
private void handleAuthentication(Buffer buffer) {
// 1. 解析认证信息
// TODO @芋艿:这里的认证协议,需要和设备端约定。默认为 productKey,deviceName,password
// TODO @haohao这里要不也 json 解析?类似 http 是 {
// "clientId": "4aymZgOTOOCrDKRT.small",
// "username": "small&4aymZgOTOOCrDKRT",
// "password": "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75"
//}
String[] parts = buffer.toString().split(",");
if (parts.length != 3) {
log.error("[handleAuthentication][认证信息({})格式不正确]", buffer);

View File

@@ -22,6 +22,7 @@ import org.springframework.stereotype.Component;
public class IotTcpDownstreamHandler {
private final IotTcpConnectionManager connectionManager;
private final IotDeviceMessageService messageService;
/**
@@ -43,6 +44,7 @@ public class IotTcpDownstreamHandler {
// 3. 发送消息
socket.write(Buffer.buffer(bytes));
// TODO @芋艿:这里的换行符,需要和设备端约定
// TODO @haohaotcp 要不定长?很少 \n 哈。然后有个 magic number可以参考 dubbo rpc
socket.write("\n");
}

View File

@@ -16,6 +16,7 @@ public final class IotMqttTopicUtils {
*/
private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao这个要删除哇
/**
* 服务调用主题前缀
*/
@@ -36,6 +37,7 @@ public final class IotMqttTopicUtils {
*/
public static final String MQTT_EVENT_PATH = "/mqtt/event";
// TODO @haohao这个要删除哇
/**
* MQTT 授权接口路径(预留)
* 对应 EMQX HTTP 授权插件的授权检查接口