feat:【IoT 物联网】新增 TCP 协议支持,添加 TCP 连接管理、上下行消息处理及相关配置

This commit is contained in:
haohao
2025-06-29 22:46:38 +08:00
parent 0593dbc9a0
commit a5a3aea522
11 changed files with 510 additions and 56 deletions

View File

@@ -1,11 +1,18 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -33,7 +40,7 @@ public class IotGatewayConfiguration {
@Bean
public IotHttpDownstreamSubscriber iotHttpDownstreamSubscriber(IotHttpUpstreamProtocol httpUpstreamProtocol,
IotMessageBus messageBus) {
IotMessageBus messageBus) {
return new IotHttpDownstreamSubscriber(httpUpstreamProtocol, messageBus);
}
}
@@ -53,21 +60,51 @@ public class IotGatewayConfiguration {
@Bean
public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
Vertx emqxVertx) {
return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@Bean
public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
Vertx emqxVertx) {
return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@Bean
public IotEmqxDownstreamSubscriber iotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol mqttUpstreamProtocol,
IotMessageBus messageBus) {
IotMessageBus messageBus) {
return new IotEmqxDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
}
}
/**
* IoT 网关 TCP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.tcp", name = "enabled", havingValue = "true")
@Slf4j
public static class TcpProtocolConfiguration {
@Bean
public Vertx tcpVertx() {
return Vertx.vertx();
}
@Bean
public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(Vertx tcpVertx, IotGatewayProperties gatewayProperties,
IotTcpConnectionManager connectionManager, IotDeviceMessageService messageService,
IotDeviceService deviceService, IotDeviceCommonApi deviceApi) {
return new IotTcpUpstreamProtocol(tcpVertx, gatewayProperties, connectionManager,
messageService, deviceService, deviceApi);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol tcpUpstreamProtocol,
IotMessageBus messageBus,
IotTcpDownstreamHandler downstreamHandler) {
return new IotTcpDownstreamSubscriber(tcpUpstreamProtocol, messageBus, downstreamHandler);
}
}
}

View File

@@ -78,6 +78,11 @@ public class IotGatewayProperties {
*/
private EmqxProperties emqx;
/**
* TCP 组件配置
*/
private TcpProperties tcp;
}
@Data
@@ -95,6 +100,25 @@ public class IotGatewayProperties {
}
@Data
public static class TcpProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务端口
*/
private Integer serverPort;
/**
* 服务主机
*/
private String serverHost;
}
@Data
public static class EmqxProperties {

View File

@@ -97,28 +97,19 @@ public class IotEmqxDownstreamHandler {
break;
case EVENT_POST:
// TODO @haohao不用 eventIdentifier 拼接哈,直接 data 里面,有 identifier 字段
// 事件上报:只支持回复消息(下行)
if (isReply) {
String identifier = IotDeviceMessageUtils.getIdentifier(message);
if (StrUtil.isNotBlank(identifier)) {
return IotMqttTopicUtils.buildEventPostReplyTopic(productKey, deviceName, identifier);
}
return IotMqttTopicUtils.buildEventPostReplyTopicGeneric(productKey, deviceName);
}
break;
case SERVICE_INVOKE:
// 服务调用:支持请求和回复
// TODO @haohao不用 serviceIdentifier 拼接哈,直接 data 里面,有 identifier 字段
String serviceIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (StrUtil.isNotBlank(serviceIdentifier)) {
if (isReply) {
return IotMqttTopicUtils.buildServiceReplyTopic(productKey, deviceName, serviceIdentifier);
} else {
return IotMqttTopicUtils.buildServiceTopic(productKey, deviceName, serviceIdentifier);
}
if (isReply) {
return IotMqttTopicUtils.buildServiceReplyTopicGeneric(productKey, deviceName);
} else {
return IotMqttTopicUtils.buildServiceTopicGeneric(productKey, deviceName);
}
break;
case CONFIG_PUSH:
// 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复)

View File

@@ -0,0 +1,62 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* IoT TCP 连接管理器
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotTcpConnectionManager {
/**
* 连接集合
*
* key设备唯一标识
*/
private final ConcurrentMap<String, NetSocket> connectionMap = new ConcurrentHashMap<>();
/**
* 添加一个新连接
*
* @param deviceId 设备唯一标识
* @param socket Netty Channel
*/
public void addConnection(String deviceId, NetSocket socket) {
log.info("[addConnection][设备({}) 连接({})]", deviceId, socket.remoteAddress());
connectionMap.put(deviceId, socket);
}
/**
* 根据设备 ID 获取连接
*
* @param deviceId 设备 ID
* @return 连接
*/
public NetSocket getConnection(String deviceId) {
return connectionMap.get(deviceId);
}
/**
* 移除指定连接
*
* @param socket Netty Channel
*/
public void removeConnection(NetSocket socket) {
connectionMap.entrySet().stream()
.filter(entry -> entry.getValue().equals(socket))
.findFirst()
.ifPresent(entry -> {
log.info("[removeConnection][设备({}) 断开连接({})]", entry.getKey(), socket.remoteAddress());
connectionMap.remove(entry.getKey());
});
}
}

View File

@@ -0,0 +1,64 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 TCP 订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotTcpUpstreamProtocol protocol;
private final IotMessageBus messageBus;
private final IotTcpDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 处理下行消息
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
}

View File

@@ -0,0 +1,71 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpConnectionHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 TCP 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotTcpUpstreamProtocol {
private final Vertx vertx;
private final IotGatewayProperties gatewayProperties;
private final IotTcpConnectionManager connectionManager;
private final IotDeviceMessageService messageService;
private final IotDeviceService deviceService;
private final IotDeviceCommonApi deviceApi;
@Getter
private String serverId;
private NetServer netServer;
@PostConstruct
public void start() {
// 1. 初始化参数
IotGatewayProperties.TcpProperties tcpProperties = gatewayProperties.getProtocol().getTcp();
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getServerPort());
// 2. 创建 TCP 服务器
netServer = vertx.createNetServer();
netServer.connectHandler(socket -> {
new IotTcpConnectionHandler(socket, connectionManager,
messageService, deviceService, deviceApi, serverId).start();
});
// 3. 启动 TCP 服务器
netServer.listen(tcpProperties.getServerPort(), tcpProperties.getServerHost())
.onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort()))
.onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e));
}
@PreDestroy
public void stop() {
if (netServer != null) {
netServer.close()
.onSuccess(v -> log.info("[stop][IoT 网关 TCP 服务已停止]"))
.onFailure(e -> log.error("[stop][IoT 网关 TCP 服务停止失败]", e));
}
}
}

View File

@@ -0,0 +1,142 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.hutool.core.util.BooleanUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT TCP 连接处理器
* <p>
* 核心负责:
* 1. 【认证】创建连接后,设备需要发送认证消息,认证通过后,才能进行后续的通信
* 2. 【消息处理】接收设备发送的消息,解码后,发送到消息队列
* 3. 【断开】设备断开连接后,清理资源
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotTcpConnectionHandler implements Handler<Buffer> {
private final NetSocket socket;
/**
* 是否已认证
*/
private boolean authenticated = false;
/**
* 设备信息
*/
private IotDeviceRespDTO device;
private final IotTcpConnectionManager connectionManager;
private final IotDeviceMessageService messageService;
private final IotDeviceService deviceService;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public void start() {
// 1. 设置解析器
final RecordParser parser = RecordParser.newDelimited("\n", this);
socket.handler(parser);
// 2. 设置处理器
socket.closeHandler(v -> handleConnectionClose());
socket.exceptionHandler(this::handleException);
}
@Override
public void handle(Buffer buffer) {
log.info("[handle][接收到数据: {}]", buffer);
try {
// 1. 处理认证
if (!authenticated) {
handleAuthentication(buffer);
return;
}
// 2. 处理消息
handleMessage(buffer);
} catch (Exception e) {
log.error("[handle][处理异常]", e);
socket.close();
}
}
private void handleAuthentication(Buffer buffer) {
// 1. 解析认证信息
// TODO @芋艿:这里的认证协议,需要和设备端约定。默认为 productKey,deviceName,password
String[] parts = buffer.toString().split(",");
if (parts.length != 3) {
log.error("[handleAuthentication][认证信息({})格式不正确]", buffer);
socket.close();
return;
}
String productKey = parts[0];
String deviceName = parts[1];
String password = parts[2];
// 2. 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(socket.remoteAddress().toString()).setUsername(productKey + "/" + deviceName)
.setPassword(password));
if (authResult.isError() || !BooleanUtil.isTrue(authResult.getData())) {
log.error("[handleAuthentication][认证失败productKey({}) deviceName({}) password({})]", productKey, deviceName,
password);
socket.close();
return;
}
// 3. 认证成功
this.authenticated = true;
this.device = deviceService.getDeviceFromCache(productKey, deviceName);
connectionManager.addConnection(String.valueOf(device.getId()), socket);
// 4. 发送上线消息
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.info("[handleAuthentication][认证成功]");
}
private void handleMessage(Buffer buffer) {
// 1. 解码消息
IotDeviceMessage message = messageService.decodeDeviceMessage(buffer.getBytes(),
device.getProductKey(), device.getDeviceName());
if (message == null) {
log.warn("[handleMessage][解码消息失败]");
return;
}
// 2. 发送消息到队列
messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId);
}
private void handleConnectionClose() {
// 1. 移除连接
connectionManager.removeConnection(socket);
// 2. 发送离线消息
if (device != null) {
IotDeviceMessage message = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId);
}
}
private void handleException(Throwable e) {
log.error("[handleException][连接({}) 发生异常]", socket.remoteAddress(), e);
socket.close();
}
}

View File

@@ -0,0 +1,49 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* IoT 网关 TCP 下行消息处理器
* <p>
* 从消息总线接收到下行消息,然后发布到 TCP 连接,从而被设备所接收
*
* @author 芋道源码
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class IotTcpDownstreamHandler {
private final IotTcpConnectionManager connectionManager;
private final IotDeviceMessageService messageService;
/**
* 处理下行消息
*
* @param message 设备消息
*/
public void handle(IotDeviceMessage message) {
// 1. 获取设备对应的连接
NetSocket socket = connectionManager.getConnection(String.valueOf(message.getDeviceId()));
if (socket == null) {
log.error("[handle][设备({})的连接不存在]", message.getDeviceId());
return;
}
// 2. 编码消息
byte[] bytes = messageService.encodeDeviceMessage(message, null, null);
// 3. 发送消息
socket.write(Buffer.buffer(bytes));
// TODO @芋艿:这里的换行符,需要和设备端约定
socket.write("\n");
}
}

View File

@@ -79,42 +79,6 @@ public final class IotMqttTopicUtils {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply";
}
/**
* 构建设备服务调用主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier;
}
/**
* 构建设备服务调用回复主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceReplyTopic(String productKey, String deviceName, String serviceIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier + "_reply";
}
/**
* 构建设备事件上报回复主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param eventIdentifier 事件标识符
* @return 完整的主题路径
*/
public static String buildEventPostReplyTopic(String productKey, String deviceName, String eventIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/" + eventIdentifier + "_reply";
}
/**
* 构建设备配置推送主题
*
@@ -126,4 +90,43 @@ public final class IotMqttTopicUtils {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push";
}
/**
* 构建设备事件上报通用回复主题
* <p>
* 不包含具体的事件标识符,事件标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildEventPostReplyTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/post_reply";
}
/**
* 构建设备服务调用通用主题
* <p>
* 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildServiceTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke";
}
/**
* 构建设备服务调用通用回复主题
* <p>
* 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildServiceReplyTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke_reply";
}
}

View File

@@ -1,5 +1,4 @@
# ==================== IoT 网关本地开发环境配置 ====================
--- #################### 消息队列相关 ####################
# rocketmq 配置项,对应 RocketMQProperties 配置类
@@ -41,6 +40,13 @@ yudao:
mqtt-ssl: false # 是否开启 SSL
mqtt-topics:
- "/sys/#" # 系统主题
# ====================================
# 针对引入的 TCP 组件的配置
# ====================================
tcp:
enabled: true
server-port: 8093
server-host: 0.0.0.0
# 消息总线配置
message-bus:
@@ -52,7 +58,7 @@ yudao:
logging:
level:
# 开发环境详细日志
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG
# MQTT 客户端日志
# io.vertx.mqtt: DEBUG

View File

@@ -41,6 +41,11 @@ yudao:
mqtt-ssl: false
mqtt-topics:
- "/sys/#" # 系统主题
# ====================================
# 针对引入的 TCP 组件的配置
# ====================================
tcp:
enabled: false
# 消息总线配置
message-bus: