diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java index 7be33571b4..6c451fd5c0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java @@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; @@ -69,58 +68,11 @@ public class IotEmqxDownstreamHandler { * @return 构建的主题,如果方法不支持返回 null */ private String buildTopicByMethod(IotDeviceMessage message, String productKey, String deviceName) { - // 1. 解析消息方法 - IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod()); - if (methodEnum == null) { - log.warn("[buildTopicByMethod][未知的消息方法: {}]", message.getMethod()); - return null; - } - - // 2. 根据消息方法和回复状态,构建 topic + // 1. 判断是否为回复消息 boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - // TODO @haohao:看看基于 message 的 method 去反向推导; - // 3. 根据消息方法类型构建对应的主题 - switch (methodEnum) { - case PROPERTY_POST: - // 属性上报:只支持回复消息(下行) - if (isReply) { - return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName); - } - break; - case PROPERTY_SET: - // 属性设置:只支持非回复消息(下行) - if (!isReply) { - return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName); - } - break; - case EVENT_POST: - // 事件上报:只支持回复消息(下行) - if (isReply) { - return IotMqttTopicUtils.buildEventPostReplyTopicGeneric(productKey, deviceName); - } - break; - case SERVICE_INVOKE: - // 服务调用:支持请求和回复 - if (isReply) { - return IotMqttTopicUtils.buildServiceReplyTopicGeneric(productKey, deviceName); - } else { - return IotMqttTopicUtils.buildServiceTopicGeneric(productKey, deviceName); - } - case CONFIG_PUSH: - // 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复) - if (!isReply) { - return IotMqttTopicUtils.buildConfigPushTopic(productKey, deviceName); - } - break; - default: - log.warn("[buildTopicByMethod][未处理的消息方法: {}]", methodEnum); - break; - } - - log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]", - message.getMethod(), isReply); - return null; + // 2. 根据消息方法类型构建对应的主题 + return IotMqttTopicUtils.buildTopicByMethod(message.getMethod(), productKey, deviceName, isReply); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index f2861581f5..957b7003d8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -1,5 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.util; +import cn.hutool.core.util.StrUtil; + /** * IoT 网关 MQTT 主题工具类 *
@@ -16,12 +18,6 @@ public final class IotMqttTopicUtils { */ private static final String SYS_TOPIC_PREFIX = "/sys/"; - // TODO @haohao:这个要删除哇? - /** - * 服务调用主题前缀 - */ - private static final String SERVICE_TOPIC_PREFIX = "/thing/"; - // ========== MQTT HTTP 接口路径常量 ========== /** @@ -37,98 +33,32 @@ public final class IotMqttTopicUtils { */ public static final String MQTT_EVENT_PATH = "/mqtt/event"; - // TODO @haohao:这个要删除哇? - /** - * MQTT 授权接口路径(预留) - * 对应 EMQX HTTP 授权插件的授权检查接口 - */ - public static final String MQTT_AUTHZ_PATH = "/mqtt/authz"; - // ========== 工具方法 ========== /** - * 构建设备主题前缀 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 设备主题前缀:/sys/{productKey}/{deviceName} - */ - private static String buildDeviceTopicPrefix(String productKey, String deviceName) { - return SYS_TOPIC_PREFIX + productKey + "/" + deviceName; - } - - /** - * 构建设备属性设置主题 + * 根据消息方法构建对应的主题 * + * @param method 消息方法,例如 thing.property.post * @param productKey 产品 Key * @param deviceName 设备名称 + * @param isReply 是否为回复消息 * @return 完整的主题路径 */ - public static String buildPropertySetTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/set"; - } + public static String buildTopicByMethod(String method, String productKey, String deviceName, boolean isReply) { + if (StrUtil.isBlank(method)) { + return null; + } - /** - * 构建设备属性上报回复主题 - *
- * 当设备上报属性时,会收到该主题的回复 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildPropertyPostReplyTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply"; - } + // 1. 将点分隔符转换为斜杠 + String topicSuffix = method.replace('.', '/'); - /** - * 构建设备配置推送主题 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildConfigPushTopic(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push"; - } + // 2. 对于回复消息,添加 _reply 后缀 + if (isReply) { + topicSuffix += "_reply"; + } - /** - * 构建设备事件上报通用回复主题 - *
- * 不包含具体的事件标识符,事件标识符通过消息 data 中的 identifier 字段传递 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildEventPostReplyTopicGeneric(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/post_reply"; - } - - /** - * 构建设备服务调用通用主题 - *
- * 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildServiceTopicGeneric(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke"; - } - - /** - * 构建设备服务调用通用回复主题 - *
- * 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递 - * - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @return 完整的主题路径 - */ - public static String buildServiceReplyTopicGeneric(String productKey, String deviceName) { - return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke_reply"; + // 3. 构建完整主题 + return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/" + topicSuffix; } } \ No newline at end of file