diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 11cbfca270..273a55f91f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,11 +1,18 @@ package cn.iocoder.yudao.module.iot.gateway.config; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -33,7 +40,7 @@ public class IotGatewayConfiguration { @Bean public IotHttpDownstreamSubscriber iotHttpDownstreamSubscriber(IotHttpUpstreamProtocol httpUpstreamProtocol, - IotMessageBus messageBus) { + IotMessageBus messageBus) { return new IotHttpDownstreamSubscriber(httpUpstreamProtocol, messageBus); } } @@ -53,21 +60,51 @@ public class IotGatewayConfiguration { @Bean public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + Vertx emqxVertx) { return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @Bean public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + Vertx emqxVertx) { return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @Bean public IotEmqxDownstreamSubscriber iotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol mqttUpstreamProtocol, - IotMessageBus messageBus) { + IotMessageBus messageBus) { return new IotEmqxDownstreamSubscriber(mqttUpstreamProtocol, messageBus); } } + /** + * IoT 网关 TCP 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.tcp", name = "enabled", havingValue = "true") + @Slf4j + public static class TcpProtocolConfiguration { + + @Bean + public Vertx tcpVertx() { + return Vertx.vertx(); + } + + @Bean + public IotTcpUpstreamProtocol iotTcpUpstreamProtocol(Vertx tcpVertx, IotGatewayProperties gatewayProperties, + IotTcpConnectionManager connectionManager, IotDeviceMessageService messageService, + IotDeviceService deviceService, IotDeviceCommonApi deviceApi) { + return new IotTcpUpstreamProtocol(tcpVertx, gatewayProperties, connectionManager, + messageService, deviceService, deviceApi); + } + + @Bean + public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol tcpUpstreamProtocol, + IotMessageBus messageBus, + IotTcpDownstreamHandler downstreamHandler) { + return new IotTcpDownstreamSubscriber(tcpUpstreamProtocol, messageBus, downstreamHandler); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 852b2e67b4..4d1d67afe1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -78,6 +78,11 @@ public class IotGatewayProperties { */ private EmqxProperties emqx; + /** + * TCP 组件配置 + */ + private TcpProperties tcp; + } @Data @@ -95,6 +100,25 @@ public class IotGatewayProperties { } + @Data + public static class TcpProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + /** + * 服务端口 + */ + private Integer serverPort; + /** + * 服务主机 + */ + private String serverHost; + + } + @Data public static class EmqxProperties { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 14995c4384..c5d77d2f46 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -97,28 +97,19 @@ public class IotEmqxDownstreamHandler { break; case EVENT_POST: - // TODO @haohao:不用 eventIdentifier 拼接哈,直接 data 里面,有 identifier 字段 // 事件上报:只支持回复消息(下行) if (isReply) { - String identifier = IotDeviceMessageUtils.getIdentifier(message); - if (StrUtil.isNotBlank(identifier)) { - return IotMqttTopicUtils.buildEventPostReplyTopic(productKey, deviceName, identifier); - } + return IotMqttTopicUtils.buildEventPostReplyTopicGeneric(productKey, deviceName); } break; case SERVICE_INVOKE: // 服务调用:支持请求和回复 - // TODO @haohao:不用 serviceIdentifier 拼接哈,直接 data 里面,有 identifier 字段 - String serviceIdentifier = IotDeviceMessageUtils.getIdentifier(message); - if (StrUtil.isNotBlank(serviceIdentifier)) { - if (isReply) { - return IotMqttTopicUtils.buildServiceReplyTopic(productKey, deviceName, serviceIdentifier); - } else { - return IotMqttTopicUtils.buildServiceTopic(productKey, deviceName, serviceIdentifier); - } + if (isReply) { + return IotMqttTopicUtils.buildServiceReplyTopicGeneric(productKey, deviceName); + } else { + return IotMqttTopicUtils.buildServiceTopicGeneric(productKey, deviceName); } - break; case CONFIG_PUSH: // 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java new file mode 100644 index 0000000000..97d1e43b3d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpConnectionManager.java @@ -0,0 +1,62 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import io.vertx.core.net.NetSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * IoT TCP 连接管理器 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotTcpConnectionManager { + + /** + * 连接集合 + * + * key:设备唯一标识 + */ + private final ConcurrentMap connectionMap = new ConcurrentHashMap<>(); + + /** + * 添加一个新连接 + * + * @param deviceId 设备唯一标识 + * @param socket Netty Channel + */ + public void addConnection(String deviceId, NetSocket socket) { + log.info("[addConnection][设备({}) 连接({})]", deviceId, socket.remoteAddress()); + connectionMap.put(deviceId, socket); + } + + /** + * 根据设备 ID 获取连接 + * + * @param deviceId 设备 ID + * @return 连接 + */ + public NetSocket getConnection(String deviceId) { + return connectionMap.get(deviceId); + } + + /** + * 移除指定连接 + * + * @param socket Netty Channel + */ + public void removeConnection(NetSocket socket) { + connectionMap.entrySet().stream() + .filter(entry -> entry.getValue().equals(socket)) + .findFirst() + .ifPresent(entry -> { + log.info("[removeConnection][设备({}) 断开连接({})]", entry.getKey(), socket.remoteAddress()); + connectionMap.remove(entry.getKey()); + }); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java new file mode 100644 index 0000000000..f324d45438 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java @@ -0,0 +1,64 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 TCP 订阅者:接收下行给设备的消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotTcpDownstreamSubscriber implements IotMessageSubscriber { + + private final IotTcpUpstreamProtocol protocol; + + private final IotMessageBus messageBus; + + private final IotTcpDownstreamHandler downstreamHandler; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId()); + try { + // 1. 校验 + String method = message.getMethod(); + if (method == null) { + log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]", + message.getId(), message.getDeviceId()); + return; + } + + // 2. 处理下行消息 + downstreamHandler.handle(message); + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]", + message.getId(), message.getMethod(), message.getDeviceId(), e); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java new file mode 100644 index 0000000000..f6bee94b5a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java @@ -0,0 +1,71 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpConnectionHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 TCP 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public class IotTcpUpstreamProtocol { + + private final Vertx vertx; + + private final IotGatewayProperties gatewayProperties; + + private final IotTcpConnectionManager connectionManager; + + private final IotDeviceMessageService messageService; + + private final IotDeviceService deviceService; + + private final IotDeviceCommonApi deviceApi; + + @Getter + private String serverId; + + private NetServer netServer; + + @PostConstruct + public void start() { + // 1. 初始化参数 + IotGatewayProperties.TcpProperties tcpProperties = gatewayProperties.getProtocol().getTcp(); + this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getServerPort()); + + // 2. 创建 TCP 服务器 + netServer = vertx.createNetServer(); + netServer.connectHandler(socket -> { + new IotTcpConnectionHandler(socket, connectionManager, + messageService, deviceService, deviceApi, serverId).start(); + }); + + // 3. 启动 TCP 服务器 + netServer.listen(tcpProperties.getServerPort(), tcpProperties.getServerHost()) + .onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort())) + .onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e)); + } + + @PreDestroy + public void stop() { + if (netServer != null) { + netServer.close() + .onSuccess(v -> log.info("[stop][IoT 网关 TCP 服务已停止]")) + .onFailure(e -> log.error("[stop][IoT 网关 TCP 服务停止失败]", e)); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java new file mode 100644 index 0000000000..c1d9c9e301 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpConnectionHandler.java @@ -0,0 +1,142 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; + +import cn.hutool.core.util.BooleanUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT TCP 连接处理器 + *

+ * 核心负责: + * 1. 【认证】创建连接后,设备需要发送认证消息,认证通过后,才能进行后续的通信 + * 2. 【消息处理】接收设备发送的消息,解码后,发送到消息队列 + * 3. 【断开】设备断开连接后,清理资源 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotTcpConnectionHandler implements Handler { + + private final NetSocket socket; + /** + * 是否已认证 + */ + private boolean authenticated = false; + /** + * 设备信息 + */ + private IotDeviceRespDTO device; + + private final IotTcpConnectionManager connectionManager; + + private final IotDeviceMessageService messageService; + + private final IotDeviceService deviceService; + + private final IotDeviceCommonApi deviceApi; + + private final String serverId; + + public void start() { + // 1. 设置解析器 + final RecordParser parser = RecordParser.newDelimited("\n", this); + socket.handler(parser); + + // 2. 设置处理器 + socket.closeHandler(v -> handleConnectionClose()); + socket.exceptionHandler(this::handleException); + } + + @Override + public void handle(Buffer buffer) { + log.info("[handle][接收到数据: {}]", buffer); + try { + // 1. 处理认证 + if (!authenticated) { + handleAuthentication(buffer); + return; + } + // 2. 处理消息 + handleMessage(buffer); + } catch (Exception e) { + log.error("[handle][处理异常]", e); + socket.close(); + } + } + + private void handleAuthentication(Buffer buffer) { + // 1. 解析认证信息 + // TODO @芋艿:这里的认证协议,需要和设备端约定。默认为 productKey,deviceName,password + String[] parts = buffer.toString().split(","); + if (parts.length != 3) { + log.error("[handleAuthentication][认证信息({})格式不正确]", buffer); + socket.close(); + return; + } + String productKey = parts[0]; + String deviceName = parts[1]; + String password = parts[2]; + + // 2. 执行认证 + CommonResult authResult = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(socket.remoteAddress().toString()).setUsername(productKey + "/" + deviceName) + .setPassword(password)); + if (authResult.isError() || !BooleanUtil.isTrue(authResult.getData())) { + log.error("[handleAuthentication][认证失败,productKey({}) deviceName({}) password({})]", productKey, deviceName, + password); + socket.close(); + return; + } + + // 3. 认证成功 + this.authenticated = true; + this.device = deviceService.getDeviceFromCache(productKey, deviceName); + connectionManager.addConnection(String.valueOf(device.getId()), socket); + + // 4. 发送上线消息 + IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline(); + messageService.sendDeviceMessage(message, productKey, deviceName, serverId); + log.info("[handleAuthentication][认证成功]"); + } + + private void handleMessage(Buffer buffer) { + // 1. 解码消息 + IotDeviceMessage message = messageService.decodeDeviceMessage(buffer.getBytes(), + device.getProductKey(), device.getDeviceName()); + if (message == null) { + log.warn("[handleMessage][解码消息失败]"); + return; + } + // 2. 发送消息到队列 + messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId); + } + + private void handleConnectionClose() { + // 1. 移除连接 + connectionManager.removeConnection(socket); + // 2. 发送离线消息 + if (device != null) { + IotDeviceMessage message = IotDeviceMessage.buildStateOffline(); + messageService.sendDeviceMessage(message, device.getProductKey(), device.getDeviceName(), serverId); + } + } + + private void handleException(Throwable e) { + log.error("[handleException][连接({}) 发生异常]", socket.remoteAddress(), e); + socket.close(); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java new file mode 100644 index 0000000000..cb7e7c0665 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java @@ -0,0 +1,49 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * IoT 网关 TCP 下行消息处理器 + *

+ * 从消息总线接收到下行消息,然后发布到 TCP 连接,从而被设备所接收 + * + * @author 芋道源码 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class IotTcpDownstreamHandler { + + private final IotTcpConnectionManager connectionManager; + private final IotDeviceMessageService messageService; + + /** + * 处理下行消息 + * + * @param message 设备消息 + */ + public void handle(IotDeviceMessage message) { + // 1. 获取设备对应的连接 + NetSocket socket = connectionManager.getConnection(String.valueOf(message.getDeviceId())); + if (socket == null) { + log.error("[handle][设备({})的连接不存在]", message.getDeviceId()); + return; + } + + // 2. 编码消息 + byte[] bytes = messageService.encodeDeviceMessage(message, null, null); + + // 3. 发送消息 + socket.write(Buffer.buffer(bytes)); + // TODO @芋艿:这里的换行符,需要和设备端约定 + socket.write("\n"); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 1faf6aeeb8..d1f1621264 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -79,42 +79,6 @@ public final class IotMqttTopicUtils { return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply"; } - /** - * 构建设备服务调用主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param serviceIdentifier 服务标识符 - * @return 完整的主题路径 - */ - public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) { - return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier; - } - - /** - * 构建设备服务调用回复主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param serviceIdentifier 服务标识符 - * @return 完整的主题路径 - */ - public static String buildServiceReplyTopic(String productKey, String deviceName, String serviceIdentifier) { - return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier + "_reply"; - } - - /** - * 构建设备事件上报回复主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param eventIdentifier 事件标识符 - * @return 完整的主题路径 - */ - public static String buildEventPostReplyTopic(String productKey, String deviceName, String eventIdentifier) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/" + eventIdentifier + "_reply"; - } - /** * 构建设备配置推送主题 * @@ -126,4 +90,43 @@ public final class IotMqttTopicUtils { return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push"; } + /** + * 构建设备事件上报通用回复主题 + *

+ * 不包含具体的事件标识符,事件标识符通过消息 data 中的 identifier 字段传递 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildEventPostReplyTopicGeneric(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/post_reply"; + } + + /** + * 构建设备服务调用通用主题 + *

+ * 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildServiceTopicGeneric(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke"; + } + + /** + * 构建设备服务调用通用回复主题 + *

+ * 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递 + * + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 完整的主题路径 + */ + public static String buildServiceReplyTopicGeneric(String productKey, String deviceName) { + return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke_reply"; + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml index ab3eda8155..1ad0e6f9e2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml @@ -1,5 +1,4 @@ # ==================== IoT 网关本地开发环境配置 ==================== - --- #################### 消息队列相关 #################### # rocketmq 配置项,对应 RocketMQProperties 配置类 @@ -41,6 +40,13 @@ yudao: mqtt-ssl: false # 是否开启 SSL mqtt-topics: - "/sys/#" # 系统主题 + # ==================================== + # 针对引入的 TCP 组件的配置 + # ==================================== + tcp: + enabled: true + server-port: 8093 + server-host: 0.0.0.0 # 消息总线配置 message-bus: @@ -52,7 +58,7 @@ yudao: logging: level: # 开发环境详细日志 - cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG # MQTT 客户端日志 # io.vertx.mqtt: DEBUG \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index b12b2f73d7..e028d5ce7c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -41,6 +41,11 @@ yudao: mqtt-ssl: false mqtt-topics: - "/sys/#" # 系统主题 + # ==================================== + # 针对引入的 TCP 组件的配置 + # ==================================== + tcp: + enabled: false # 消息总线配置 message-bus: