feat:【IoT 物联网】优化根据消息方法和回复状态构建主题逻辑

This commit is contained in:
安浩浩
2025-06-30 09:50:18 +08:00
parent bf41d47fa8
commit 3ca4cf265a
2 changed files with 20 additions and 138 deletions

View File

@@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
@@ -69,58 +68,11 @@ public class IotEmqxDownstreamHandler {
* @return 构建的主题,如果方法不支持返回 null
*/
private String buildTopicByMethod(IotDeviceMessage message, String productKey, String deviceName) {
// 1. 解析消息方法
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod());
if (methodEnum == null) {
log.warn("[buildTopicByMethod][未知的消息方法: {}]", message.getMethod());
return null;
}
// 2. 根据消息方法和回复状态,构建 topic
// 1. 判断是否为回复消息
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @haohao看看基于 message 的 method 去反向推导;
// 3. 根据消息方法类型构建对应的主题
switch (methodEnum) {
case PROPERTY_POST:
// 属性上报:只支持回复消息(下行)
if (isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
break;
case PROPERTY_SET:
// 属性设置:只支持非回复消息(下行)
if (!isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
}
break;
case EVENT_POST:
// 事件上报:只支持回复消息(下行)
if (isReply) {
return IotMqttTopicUtils.buildEventPostReplyTopicGeneric(productKey, deviceName);
}
break;
case SERVICE_INVOKE:
// 服务调用:支持请求和回复
if (isReply) {
return IotMqttTopicUtils.buildServiceReplyTopicGeneric(productKey, deviceName);
} else {
return IotMqttTopicUtils.buildServiceTopicGeneric(productKey, deviceName);
}
case CONFIG_PUSH:
// 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复)
if (!isReply) {
return IotMqttTopicUtils.buildConfigPushTopic(productKey, deviceName);
}
break;
default:
log.warn("[buildTopicByMethod][未处理的消息方法: {}]", methodEnum);
break;
}
log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]",
message.getMethod(), isReply);
return null;
// 2. 根据消息方法类型构建对应的主题
return IotMqttTopicUtils.buildTopicByMethod(message.getMethod(), productKey, deviceName, isReply);
}
}

View File

@@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.util;
import cn.hutool.core.util.StrUtil;
/**
* IoT 网关 MQTT 主题工具类
* <p>
@@ -16,12 +18,6 @@ public final class IotMqttTopicUtils {
*/
private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao这个要删除哇
/**
* 服务调用主题前缀
*/
private static final String SERVICE_TOPIC_PREFIX = "/thing/";
// ========== MQTT HTTP 接口路径常量 ==========
/**
@@ -37,98 +33,32 @@ public final class IotMqttTopicUtils {
*/
public static final String MQTT_EVENT_PATH = "/mqtt/event";
// TODO @haohao这个要删除哇
/**
* MQTT 授权接口路径(预留)
* 对应 EMQX HTTP 授权插件的授权检查接口
*/
public static final String MQTT_AUTHZ_PATH = "/mqtt/authz";
// ========== 工具方法 ==========
/**
* 构建设备主题前缀
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 设备主题前缀:/sys/{productKey}/{deviceName}
*/
private static String buildDeviceTopicPrefix(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName;
}
/**
* 构建设备属性设置主题
* 根据消息方法构建对应的主题
*
* @param method 消息方法,例如 thing.property.post
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param isReply 是否为回复消息
* @return 完整的主题路径
*/
public static String buildPropertySetTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/set";
}
public static String buildTopicByMethod(String method, String productKey, String deviceName, boolean isReply) {
if (StrUtil.isBlank(method)) {
return null;
}
/**
* 构建设备属性上报回复主题
* <p>
* 当设备上报属性时,会收到该主题的回复
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyPostReplyTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/property/post_reply";
}
// 1. 将点分隔符转换为斜杠
String topicSuffix = method.replace('.', '/');
/**
* 构建设备配置推送主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildConfigPushTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push";
}
// 2. 对于回复消息,添加 _reply 后缀
if (isReply) {
topicSuffix += "_reply";
}
/**
* 构建设备事件上报通用回复主题
* <p>
* 不包含具体的事件标识符,事件标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildEventPostReplyTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/post_reply";
}
/**
* 构建设备服务调用通用主题
* <p>
* 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildServiceTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke";
}
/**
* 构建设备服务调用通用回复主题
* <p>
* 不包含具体的服务标识符,服务标识符通过消息 data 中的 identifier 字段传递
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildServiceReplyTopicGeneric(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/service/invoke_reply";
// 3. 构建完整主题
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/" + topicSuffix;
}
}