feat:【IOT】新增 HTTP 协议支持及相关解析器,完善协议转换器功能
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.config;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.protocol.convert.IotProtocolConverter;
|
||||
import cn.iocoder.yudao.module.iot.protocol.convert.impl.DefaultIotProtocolConverter;
|
||||
import cn.iocoder.yudao.module.iot.protocol.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotAlinkMessageParser;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotHttpMessageParser;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@@ -13,13 +18,58 @@ import org.springframework.context.annotation.Configuration;
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
public class IotProtocolAutoConfiguration {
|
||||
|
||||
/**
|
||||
* Bean 名称常量
|
||||
*/
|
||||
public static final String IOT_ALINK_MESSAGE_PARSER_BEAN_NAME = "iotAlinkMessageParser";
|
||||
public static final String IOT_HTTP_MESSAGE_PARSER_BEAN_NAME = "iotHttpMessageParser";
|
||||
|
||||
/**
|
||||
* 注册 Alink 协议消息解析器
|
||||
*
|
||||
* @return Alink 协议消息解析器
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = IOT_ALINK_MESSAGE_PARSER_BEAN_NAME)
|
||||
public IotMessageParser iotAlinkMessageParser() {
|
||||
return new IotAlinkMessageParser();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册 HTTP 协议消息解析器
|
||||
*
|
||||
* @return HTTP 协议消息解析器
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = IOT_HTTP_MESSAGE_PARSER_BEAN_NAME)
|
||||
public IotMessageParser iotHttpMessageParser() {
|
||||
return new IotHttpMessageParser();
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册默认协议转换器
|
||||
* <p>
|
||||
* 如果用户没有自定义协议转换器,则使用默认实现
|
||||
* 默认会注册 Alink 和 HTTP 协议解析器
|
||||
*
|
||||
* @param iotAlinkMessageParser Alink 协议解析器
|
||||
* @param iotHttpMessageParser HTTP 协议解析器
|
||||
* @return 默认协议转换器
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public IotProtocolConverter iotProtocolConverter(IotMessageParser iotAlinkMessageParser,
|
||||
IotMessageParser iotHttpMessageParser) {
|
||||
DefaultIotProtocolConverter converter = new DefaultIotProtocolConverter();
|
||||
|
||||
// 注册 HTTP 协议解析器
|
||||
converter.registerParser(IotProtocolTypeEnum.HTTP.getCode(), iotHttpMessageParser);
|
||||
|
||||
// 注意:Alink 协议解析器已经在 DefaultIotProtocolConverter 构造函数中注册
|
||||
// 如果需要使用自定义的 Alink 解析器实例,可以重新注册
|
||||
// converter.registerParser(IotProtocolTypeEnum.ALINK.getCode(),
|
||||
// iotAlinkMessageParser);
|
||||
|
||||
return converter;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,166 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.constants;
|
||||
|
||||
/**
|
||||
* IoT HTTP 协议常量类
|
||||
* <p>
|
||||
* 用于统一管理 HTTP 协议中的常量,包括路径、字段名、默认值等
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
public class IotHttpConstants {
|
||||
|
||||
/**
|
||||
* 路径常量
|
||||
*/
|
||||
public static class Path {
|
||||
/**
|
||||
* 认证路径
|
||||
*/
|
||||
public static final String AUTH = "/auth";
|
||||
|
||||
/**
|
||||
* 主题路径前缀
|
||||
*/
|
||||
public static final String TOPIC_PREFIX = "/topic";
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证字段常量
|
||||
*/
|
||||
public static class AuthField {
|
||||
/**
|
||||
* 产品Key
|
||||
*/
|
||||
public static final String PRODUCT_KEY = "productKey";
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
public static final String DEVICE_NAME = "deviceName";
|
||||
|
||||
/**
|
||||
* 客户端ID
|
||||
*/
|
||||
public static final String CLIENT_ID = "clientId";
|
||||
|
||||
/**
|
||||
* 时间戳
|
||||
*/
|
||||
public static final String TIMESTAMP = "timestamp";
|
||||
|
||||
/**
|
||||
* 签名
|
||||
*/
|
||||
public static final String SIGN = "sign";
|
||||
|
||||
/**
|
||||
* 签名方法
|
||||
*/
|
||||
public static final String SIGN_METHOD = "signmethod";
|
||||
|
||||
/**
|
||||
* 版本
|
||||
*/
|
||||
public static final String VERSION = "version";
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息字段常量
|
||||
*/
|
||||
public static class MessageField {
|
||||
/**
|
||||
* 消息ID
|
||||
*/
|
||||
public static final String ID = "id";
|
||||
|
||||
/**
|
||||
* 方法名
|
||||
*/
|
||||
public static final String METHOD = "method";
|
||||
|
||||
/**
|
||||
* 版本
|
||||
*/
|
||||
public static final String VERSION = "version";
|
||||
|
||||
/**
|
||||
* 参数
|
||||
*/
|
||||
public static final String PARAMS = "params";
|
||||
|
||||
/**
|
||||
* 数据
|
||||
*/
|
||||
public static final String DATA = "data";
|
||||
}
|
||||
|
||||
/**
|
||||
* 响应字段常量
|
||||
*/
|
||||
public static class ResponseField {
|
||||
/**
|
||||
* 状态码
|
||||
*/
|
||||
public static final String CODE = "code";
|
||||
|
||||
/**
|
||||
* 消息
|
||||
*/
|
||||
public static final String MESSAGE = "message";
|
||||
|
||||
/**
|
||||
* 信息
|
||||
*/
|
||||
public static final String INFO = "info";
|
||||
|
||||
/**
|
||||
* 令牌
|
||||
*/
|
||||
public static final String TOKEN = "token";
|
||||
|
||||
/**
|
||||
* 消息ID
|
||||
*/
|
||||
public static final String MESSAGE_ID = "messageId";
|
||||
}
|
||||
|
||||
/**
|
||||
* 默认值常量
|
||||
*/
|
||||
public static class DefaultValue {
|
||||
/**
|
||||
* 默认签名方法
|
||||
*/
|
||||
public static final String SIGN_METHOD = "hmacmd5";
|
||||
|
||||
/**
|
||||
* 默认版本
|
||||
*/
|
||||
public static final String VERSION = "default";
|
||||
|
||||
/**
|
||||
* 默认消息版本
|
||||
*/
|
||||
public static final String MESSAGE_VERSION = "1.0";
|
||||
|
||||
/**
|
||||
* 未知方法名
|
||||
*/
|
||||
public static final String UNKNOWN_METHOD = "unknown";
|
||||
}
|
||||
|
||||
/**
|
||||
* 方法名常量
|
||||
*/
|
||||
public static class Method {
|
||||
/**
|
||||
* 设备认证
|
||||
*/
|
||||
public static final String DEVICE_AUTH = "device.auth";
|
||||
|
||||
/**
|
||||
* 自定义消息
|
||||
*/
|
||||
public static final String CUSTOM_MESSAGE = "custom.message";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.constants;
|
||||
|
||||
/**
|
||||
* IoT 协议日志消息常量类
|
||||
* <p>
|
||||
* 用于统一管理协议模块中的日志消息常量
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
public class IotLogConstants {
|
||||
|
||||
/**
|
||||
* HTTP 协议日志消息
|
||||
*/
|
||||
public static class Http {
|
||||
/**
|
||||
* 收到空消息内容
|
||||
*/
|
||||
public static final String RECEIVED_EMPTY_MESSAGE = "[HTTP] 收到空消息内容, topic={}";
|
||||
|
||||
/**
|
||||
* 不支持的路径格式
|
||||
*/
|
||||
public static final String UNSUPPORTED_PATH_FORMAT = "[HTTP] 不支持的路径格式, topic={}";
|
||||
|
||||
/**
|
||||
* 解析消息失败
|
||||
*/
|
||||
public static final String PARSE_MESSAGE_FAILED = "[HTTP] 解析消息失败, topic={}";
|
||||
|
||||
/**
|
||||
* 认证消息非JSON格式
|
||||
*/
|
||||
public static final String AUTH_MESSAGE_NOT_JSON = "[HTTP] 认证消息非JSON格式, message={}";
|
||||
|
||||
/**
|
||||
* 认证消息缺少必需字段
|
||||
*/
|
||||
public static final String AUTH_MESSAGE_MISSING_REQUIRED_FIELDS = "[HTTP] 认证消息缺少必需字段, message={}";
|
||||
|
||||
/**
|
||||
* 格式化响应失败
|
||||
*/
|
||||
public static final String FORMAT_RESPONSE_FAILED = "[HTTP] 格式化响应失败";
|
||||
}
|
||||
|
||||
/**
|
||||
* 协议转换器日志消息
|
||||
*/
|
||||
public static class Converter {
|
||||
/**
|
||||
* 注册协议解析器
|
||||
*/
|
||||
public static final String REGISTER_PARSER = "[协议转换器] 注册协议解析器: protocol={}, parser={}";
|
||||
|
||||
/**
|
||||
* 移除协议解析器
|
||||
*/
|
||||
public static final String REMOVE_PARSER = "[协议转换器] 移除协议解析器: protocol={}";
|
||||
|
||||
/**
|
||||
* 不支持的协议类型
|
||||
*/
|
||||
public static final String UNSUPPORTED_PROTOCOL = "[协议转换器] 不支持的协议类型: protocol={}";
|
||||
|
||||
/**
|
||||
* 转换消息失败
|
||||
*/
|
||||
public static final String CONVERT_MESSAGE_FAILED = "[协议转换器] 转换消息失败: protocol={}, topic={}";
|
||||
|
||||
/**
|
||||
* 格式化响应失败
|
||||
*/
|
||||
public static final String FORMAT_RESPONSE_FAILED = "[协议转换器] 格式化响应失败: protocol={}";
|
||||
|
||||
/**
|
||||
* 自动选择协议
|
||||
*/
|
||||
public static final String AUTO_SELECT_PROTOCOL = "[协议转换器] 自动选择协议: protocol={}, topic={}";
|
||||
|
||||
/**
|
||||
* 协议解析失败,尝试下一个
|
||||
*/
|
||||
public static final String PROTOCOL_PARSE_FAILED_TRY_NEXT = "[协议转换器] 协议解析失败,尝试下一个: protocol={}, topic={}";
|
||||
|
||||
/**
|
||||
* 无法自动识别协议
|
||||
*/
|
||||
public static final String CANNOT_AUTO_RECOGNIZE_PROTOCOL = "[协议转换器] 无法自动识别协议: topic={}";
|
||||
}
|
||||
}
|
||||
@@ -69,4 +69,89 @@ public class IotTopicConstants {
|
||||
*/
|
||||
public static final String REPLY_SUFFIX = "_reply";
|
||||
|
||||
}
|
||||
/**
|
||||
* 方法名前缀常量
|
||||
*/
|
||||
public static class MethodPrefix {
|
||||
/**
|
||||
* 物模型服务前缀
|
||||
*/
|
||||
public static final String THING_SERVICE = "thing.service.";
|
||||
|
||||
/**
|
||||
* 物模型事件前缀
|
||||
*/
|
||||
public static final String THING_EVENT = "thing.event.";
|
||||
}
|
||||
|
||||
/**
|
||||
* 完整方法名常量
|
||||
*/
|
||||
public static class Method {
|
||||
/**
|
||||
* 属性设置方法
|
||||
*/
|
||||
public static final String PROPERTY_SET = "thing.service.property.set";
|
||||
|
||||
/**
|
||||
* 属性获取方法
|
||||
*/
|
||||
public static final String PROPERTY_GET = "thing.service.property.get";
|
||||
|
||||
/**
|
||||
* 属性上报方法
|
||||
*/
|
||||
public static final String PROPERTY_POST = "thing.event.property.post";
|
||||
|
||||
/**
|
||||
* 配置设置方法
|
||||
*/
|
||||
public static final String CONFIG_SET = "thing.service.config.set";
|
||||
|
||||
/**
|
||||
* OTA升级方法
|
||||
*/
|
||||
public static final String OTA_UPGRADE = "thing.service.ota.upgrade";
|
||||
|
||||
/**
|
||||
* 设备上线方法
|
||||
*/
|
||||
public static final String DEVICE_ONLINE = "device.online";
|
||||
|
||||
/**
|
||||
* 设备下线方法
|
||||
*/
|
||||
public static final String DEVICE_OFFLINE = "device.offline";
|
||||
|
||||
/**
|
||||
* 心跳方法
|
||||
*/
|
||||
public static final String HEARTBEAT = "heartbeat";
|
||||
}
|
||||
|
||||
/**
|
||||
* 主题关键字常量
|
||||
*/
|
||||
public static class Keyword {
|
||||
/**
|
||||
* 事件关键字
|
||||
*/
|
||||
public static final String EVENT = "event";
|
||||
|
||||
/**
|
||||
* 服务关键字
|
||||
*/
|
||||
public static final String SERVICE = "service";
|
||||
|
||||
/**
|
||||
* 属性关键字
|
||||
*/
|
||||
public static final String PROPERTY = "property";
|
||||
|
||||
/**
|
||||
* 上报关键字
|
||||
*/
|
||||
public static final String POST = "post";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.convert;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
|
||||
|
||||
/**
|
||||
* IoT 协议转换器接口
|
||||
* <p>
|
||||
* 用于在不同协议之间进行转换
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
public interface IotProtocolConverter {
|
||||
|
||||
/**
|
||||
* 将字节数组转换为标准消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息负载
|
||||
* @param protocol 协议类型
|
||||
* @return 标准消息对象,转换失败返回 null
|
||||
*/
|
||||
IotAlinkMessage convertToStandardMessage(String topic, byte[] payload, String protocol);
|
||||
|
||||
/**
|
||||
* 将标准响应转换为字节数组
|
||||
*
|
||||
* @param response 标准响应
|
||||
* @param protocol 协议类型
|
||||
* @return 字节数组,转换失败返回空数组
|
||||
*/
|
||||
byte[] convertFromStandardResponse(IotStandardResponse response, String protocol);
|
||||
|
||||
/**
|
||||
* 检查是否支持指定协议
|
||||
*
|
||||
* @param protocol 协议类型
|
||||
* @return 如果支持返回 true,否则返回 false
|
||||
*/
|
||||
boolean supportsProtocol(String protocol);
|
||||
|
||||
/**
|
||||
* 获取支持的协议类型列表
|
||||
*
|
||||
* @return 协议类型数组
|
||||
*/
|
||||
String[] getSupportedProtocols();
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.convert.impl;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotLogConstants;
|
||||
import cn.iocoder.yudao.module.iot.protocol.convert.IotProtocolConverter;
|
||||
import cn.iocoder.yudao.module.iot.protocol.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotAlinkMessageParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 默认 IoT 协议转换器实现
|
||||
* <p>
|
||||
* 支持多种协议的转换,可以通过注册不同的消息解析器来扩展支持的协议
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
@Slf4j
|
||||
public class DefaultIotProtocolConverter implements IotProtocolConverter {
|
||||
|
||||
/**
|
||||
* 消息解析器映射
|
||||
* Key: 协议类型,Value: 消息解析器
|
||||
*/
|
||||
private final Map<String, IotMessageParser> parsers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 构造函数,初始化默认支持的协议
|
||||
*/
|
||||
public DefaultIotProtocolConverter() {
|
||||
// 注册 Alink 协议解析器
|
||||
registerParser(IotProtocolTypeEnum.ALINK.getCode(), new IotAlinkMessageParser());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册消息解析器
|
||||
*
|
||||
* @param protocol 协议类型
|
||||
* @param parser 消息解析器
|
||||
*/
|
||||
public void registerParser(String protocol, IotMessageParser parser) {
|
||||
parsers.put(protocol, parser);
|
||||
log.info(IotLogConstants.Converter.REGISTER_PARSER, protocol, parser.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除消息解析器
|
||||
*
|
||||
* @param protocol 协议类型
|
||||
*/
|
||||
public void removeParser(String protocol) {
|
||||
parsers.remove(protocol);
|
||||
log.info(IotLogConstants.Converter.REMOVE_PARSER, protocol);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotAlinkMessage convertToStandardMessage(String topic, byte[] payload, String protocol) {
|
||||
IotMessageParser parser = parsers.get(protocol);
|
||||
if (parser == null) {
|
||||
log.warn(IotLogConstants.Converter.UNSUPPORTED_PROTOCOL, protocol);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return parser.parse(topic, payload);
|
||||
} catch (Exception e) {
|
||||
log.error(IotLogConstants.Converter.CONVERT_MESSAGE_FAILED, protocol, topic, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] convertFromStandardResponse(IotStandardResponse response, String protocol) {
|
||||
IotMessageParser parser = parsers.get(protocol);
|
||||
if (parser == null) {
|
||||
log.warn(IotLogConstants.Converter.UNSUPPORTED_PROTOCOL, protocol);
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
try {
|
||||
return parser.formatResponse(response);
|
||||
} catch (Exception e) {
|
||||
log.error(IotLogConstants.Converter.FORMAT_RESPONSE_FAILED, protocol, e);
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsProtocol(String protocol) {
|
||||
return parsers.containsKey(protocol);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getSupportedProtocols() {
|
||||
Set<String> protocols = parsers.keySet();
|
||||
return protocols.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主题自动选择合适的协议解析器
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param payload 消息负载
|
||||
* @return 解析后的标准消息,如果无法解析返回 null
|
||||
*/
|
||||
public IotAlinkMessage autoConvert(String topic, byte[] payload) {
|
||||
// 遍历所有解析器,找到能处理该主题的解析器
|
||||
for (Map.Entry<String, IotMessageParser> entry : parsers.entrySet()) {
|
||||
IotMessageParser parser = entry.getValue();
|
||||
if (parser.canHandle(topic)) {
|
||||
try {
|
||||
IotAlinkMessage message = parser.parse(topic, payload);
|
||||
if (message != null) {
|
||||
log.debug(IotLogConstants.Converter.AUTO_SELECT_PROTOCOL, entry.getKey(), topic);
|
||||
return message;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug(IotLogConstants.Converter.PROTOCOL_PARSE_FAILED_TRY_NEXT, entry.getKey(), topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.warn(IotLogConstants.Converter.CANNOT_AUTO_RECOGNIZE_PROTOCOL, topic);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* IoT 消息方向枚举
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum IotMessageDirectionEnum {
|
||||
|
||||
/**
|
||||
* 上行消息(设备到平台)
|
||||
*/
|
||||
UPSTREAM("upstream", "上行"),
|
||||
|
||||
/**
|
||||
* 下行消息(平台到设备)
|
||||
*/
|
||||
DOWNSTREAM("downstream", "下行");
|
||||
|
||||
/**
|
||||
* 方向编码
|
||||
*/
|
||||
private final String code;
|
||||
|
||||
/**
|
||||
* 方向名称
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* 根据编码获取消息方向
|
||||
*
|
||||
* @param code 方向编码
|
||||
* @return 消息方向枚举,如果未找到返回 null
|
||||
*/
|
||||
public static IotMessageDirectionEnum getByCode(String code) {
|
||||
for (IotMessageDirectionEnum direction : values()) {
|
||||
if (direction.getCode().equals(code)) {
|
||||
return direction;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.enums;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotTopicConstants;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* IoT 消息类型枚举
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum IotMessageTypeEnum {
|
||||
|
||||
/**
|
||||
* 属性上报
|
||||
*/
|
||||
PROPERTY_POST("property.post", "属性上报"),
|
||||
|
||||
/**
|
||||
* 属性设置
|
||||
*/
|
||||
PROPERTY_SET("property.set", "属性设置"),
|
||||
|
||||
/**
|
||||
* 属性获取
|
||||
*/
|
||||
PROPERTY_GET("property.get", "属性获取"),
|
||||
|
||||
/**
|
||||
* 事件上报
|
||||
*/
|
||||
EVENT_POST("event.post", "事件上报"),
|
||||
|
||||
/**
|
||||
* 服务调用
|
||||
*/
|
||||
SERVICE_INVOKE("service.invoke", "服务调用"),
|
||||
|
||||
/**
|
||||
* 配置设置
|
||||
*/
|
||||
CONFIG_SET("config.set", "配置设置"),
|
||||
|
||||
/**
|
||||
* OTA 升级
|
||||
*/
|
||||
OTA_UPGRADE("ota.upgrade", "OTA升级"),
|
||||
|
||||
/**
|
||||
* 设备上线
|
||||
*/
|
||||
DEVICE_ONLINE("device.online", "设备上线"),
|
||||
|
||||
/**
|
||||
* 设备下线
|
||||
*/
|
||||
DEVICE_OFFLINE("device.offline", "设备下线"),
|
||||
|
||||
/**
|
||||
* 心跳
|
||||
*/
|
||||
HEARTBEAT("heartbeat", "心跳");
|
||||
|
||||
/**
|
||||
* 消息类型编码
|
||||
*/
|
||||
private final String code;
|
||||
|
||||
/**
|
||||
* 消息类型名称
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* 根据编码获取消息类型
|
||||
*
|
||||
* @param code 消息类型编码
|
||||
* @return 消息类型枚举,如果未找到返回 null
|
||||
*/
|
||||
public static IotMessageTypeEnum getByCode(String code) {
|
||||
for (IotMessageTypeEnum type : values()) {
|
||||
if (type.getCode().equals(code)) {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据方法名获取消息类型
|
||||
*
|
||||
* @param method 方法名
|
||||
* @return 消息类型枚举,如果未找到返回 null
|
||||
*/
|
||||
public static IotMessageTypeEnum getByMethod(String method) {
|
||||
if (method == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 处理 thing.service.xxx 格式
|
||||
if (method.startsWith(IotTopicConstants.MethodPrefix.THING_SERVICE)) {
|
||||
String servicePart = method.substring(IotTopicConstants.MethodPrefix.THING_SERVICE.length());
|
||||
if ("property.set".equals(servicePart)) {
|
||||
return PROPERTY_SET;
|
||||
} else if ("property.get".equals(servicePart)) {
|
||||
return PROPERTY_GET;
|
||||
} else if ("config.set".equals(servicePart)) {
|
||||
return CONFIG_SET;
|
||||
} else if ("ota.upgrade".equals(servicePart)) {
|
||||
return OTA_UPGRADE;
|
||||
} else {
|
||||
return SERVICE_INVOKE;
|
||||
}
|
||||
}
|
||||
|
||||
// 处理 thing.event.xxx 格式
|
||||
if (method.startsWith(IotTopicConstants.MethodPrefix.THING_EVENT)) {
|
||||
String eventPart = method.substring(IotTopicConstants.MethodPrefix.THING_EVENT.length());
|
||||
if ("property.post".equals(eventPart)) {
|
||||
return PROPERTY_POST;
|
||||
} else {
|
||||
return EVENT_POST;
|
||||
}
|
||||
}
|
||||
|
||||
// 其他类型
|
||||
switch (method) {
|
||||
case IotTopicConstants.Method.DEVICE_ONLINE:
|
||||
return DEVICE_ONLINE;
|
||||
case IotTopicConstants.Method.DEVICE_OFFLINE:
|
||||
return DEVICE_OFFLINE;
|
||||
case IotTopicConstants.Method.HEARTBEAT:
|
||||
return HEARTBEAT;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* IoT 协议类型枚举
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum IotProtocolTypeEnum {
|
||||
|
||||
/**
|
||||
* Alink 协议(阿里云物联网协议)
|
||||
*/
|
||||
ALINK("alink", "Alink 协议"),
|
||||
|
||||
/**
|
||||
* MQTT 原始协议
|
||||
*/
|
||||
MQTT_RAW("mqtt_raw", "MQTT 原始协议"),
|
||||
|
||||
/**
|
||||
* HTTP 协议
|
||||
*/
|
||||
HTTP("http", "HTTP 协议"),
|
||||
|
||||
/**
|
||||
* TCP 协议
|
||||
*/
|
||||
TCP("tcp", "TCP 协议"),
|
||||
|
||||
/**
|
||||
* UDP 协议
|
||||
*/
|
||||
UDP("udp", "UDP 协议"),
|
||||
|
||||
/**
|
||||
* 自定义协议
|
||||
*/
|
||||
CUSTOM("custom", "自定义协议");
|
||||
|
||||
/**
|
||||
* 协议编码
|
||||
*/
|
||||
private final String code;
|
||||
|
||||
/**
|
||||
* 协议名称
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* 根据编码获取协议类型
|
||||
*
|
||||
* @param code 协议编码
|
||||
* @return 协议类型枚举,如果未找到返回 null
|
||||
*/
|
||||
public static IotProtocolTypeEnum getByCode(String code) {
|
||||
for (IotProtocolTypeEnum type : values()) {
|
||||
if (type.getCode().equals(code)) {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否为有效的协议编码
|
||||
*
|
||||
* @param code 协议编码
|
||||
* @return 如果有效返回 true,否则返回 false
|
||||
*/
|
||||
public static boolean isValidCode(String code) {
|
||||
return getByCode(code) != null;
|
||||
}
|
||||
}
|
||||
@@ -11,9 +11,9 @@ import java.util.Map;
|
||||
* IoT Alink 消息模型
|
||||
* <p>
|
||||
* 基于阿里云 Alink 协议规范实现的标准消息格式
|
||||
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/alink-protocol-1">阿里云物联网 —— Alink 协议</a>
|
||||
*
|
||||
* @author haohao
|
||||
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/alink-protocol-1">阿里云物联网 —— Alink 协议</a>
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
|
||||
@@ -40,7 +40,7 @@ public class IotAlinkMessageParser implements IotMessageParser {
|
||||
JSONObject json = JSONUtil.parseObj(message);
|
||||
String id = json.getStr("id");
|
||||
String method = json.getStr("method");
|
||||
|
||||
|
||||
if (StrUtil.isBlank(method)) {
|
||||
// 尝试从 topic 中解析方法
|
||||
method = IotTopicUtils.parseMethodFromTopic(topic);
|
||||
|
||||
@@ -0,0 +1,348 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.message.impl;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotHttpConstants;
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotLogConstants;
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotTopicConstants;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
|
||||
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT HTTP 协议消息解析器实现
|
||||
* <p>
|
||||
* 参考阿里云IoT平台HTTPS协议标准,支持设备认证和数据上报两种消息类型:
|
||||
* <p>
|
||||
* 1. 设备认证消息格式:
|
||||
*
|
||||
* <pre>
|
||||
* POST /auth HTTP/1.1
|
||||
* Content-Type: application/json
|
||||
* {
|
||||
* "productKey": "a1AbC***",
|
||||
* "deviceName": "device01",
|
||||
* "clientId": "device01_001",
|
||||
* "timestamp": "1501668289957",
|
||||
* "sign": "xxxxx",
|
||||
* "signmethod": "hmacsha1",
|
||||
* "version": "default"
|
||||
* }
|
||||
* </pre>
|
||||
* <p>
|
||||
* 2. 数据上报消息格式:
|
||||
*
|
||||
* <pre>
|
||||
* POST /topic/${topic} HTTP/1.1
|
||||
* password: ${token}
|
||||
* Content-Type: application/octet-stream
|
||||
* ${payload}
|
||||
* </pre>
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotHttpMessageParser implements IotMessageParser {
|
||||
|
||||
/**
|
||||
* 认证路径
|
||||
*/
|
||||
public static final String AUTH_PATH = IotHttpConstants.Path.AUTH;
|
||||
|
||||
/**
|
||||
* 主题路径前缀
|
||||
*/
|
||||
public static final String TOPIC_PATH_PREFIX = IotHttpConstants.Path.TOPIC_PREFIX;
|
||||
|
||||
@Override
|
||||
public IotAlinkMessage parse(String topic, byte[] payload) {
|
||||
if (payload == null || payload.length == 0) {
|
||||
log.warn(IotLogConstants.Http.RECEIVED_EMPTY_MESSAGE, topic);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
String message = new String(payload, StandardCharsets.UTF_8);
|
||||
|
||||
// 判断是认证请求还是数据上报
|
||||
if (AUTH_PATH.equals(topic)) {
|
||||
return parseAuthMessage(message);
|
||||
} else if (topic.startsWith(TOPIC_PATH_PREFIX)) {
|
||||
return parseDataMessage(topic, message);
|
||||
} else {
|
||||
log.warn(IotLogConstants.Http.UNSUPPORTED_PATH_FORMAT, topic);
|
||||
return null;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error(IotLogConstants.Http.PARSE_MESSAGE_FAILED, topic, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析设备认证消息
|
||||
*
|
||||
* @param message 认证消息JSON
|
||||
* @return 标准消息格式
|
||||
*/
|
||||
private IotAlinkMessage parseAuthMessage(String message) {
|
||||
if (!JSONUtil.isTypeJSON(message)) {
|
||||
log.warn(IotLogConstants.Http.AUTH_MESSAGE_NOT_JSON, message);
|
||||
return null;
|
||||
}
|
||||
|
||||
JSONObject json = JSONUtil.parseObj(message);
|
||||
|
||||
// 验证必需字段
|
||||
String productKey = json.getStr(IotHttpConstants.AuthField.PRODUCT_KEY);
|
||||
String deviceName = json.getStr(IotHttpConstants.AuthField.DEVICE_NAME);
|
||||
String clientId = json.getStr(IotHttpConstants.AuthField.CLIENT_ID);
|
||||
String sign = json.getStr(IotHttpConstants.AuthField.SIGN);
|
||||
|
||||
if (StrUtil.hasBlank(productKey, deviceName, clientId, sign)) {
|
||||
log.warn(IotLogConstants.Http.AUTH_MESSAGE_MISSING_REQUIRED_FIELDS, message);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 构建认证消息
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(IotHttpConstants.AuthField.PRODUCT_KEY, productKey);
|
||||
params.put(IotHttpConstants.AuthField.DEVICE_NAME, deviceName);
|
||||
params.put(IotHttpConstants.AuthField.CLIENT_ID, clientId);
|
||||
params.put(IotHttpConstants.AuthField.TIMESTAMP, json.getStr(IotHttpConstants.AuthField.TIMESTAMP));
|
||||
params.put(IotHttpConstants.AuthField.SIGN, sign);
|
||||
params.put(IotHttpConstants.AuthField.SIGN_METHOD,
|
||||
json.getStr(IotHttpConstants.AuthField.SIGN_METHOD, IotHttpConstants.DefaultValue.SIGN_METHOD));
|
||||
|
||||
return IotAlinkMessage.builder()
|
||||
.id(generateMessageId())
|
||||
.method(IotHttpConstants.Method.DEVICE_AUTH)
|
||||
.version(json.getStr(IotHttpConstants.AuthField.VERSION, IotHttpConstants.DefaultValue.VERSION))
|
||||
.params(params)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析数据上报消息
|
||||
*
|
||||
* @param topic 主题路径,格式:/topic/${actualTopic}
|
||||
* @param message 消息内容
|
||||
* @return 标准消息格式
|
||||
*/
|
||||
private IotAlinkMessage parseDataMessage(String topic, String message) {
|
||||
// 提取实际的主题,去掉 /topic 前缀
|
||||
String actualTopic = topic.substring(TOPIC_PATH_PREFIX.length()); // 直接移除/topic前缀
|
||||
|
||||
// 尝试解析为JSON格式
|
||||
if (JSONUtil.isTypeJSON(message)) {
|
||||
return parseJsonDataMessage(actualTopic, message);
|
||||
} else {
|
||||
// 原始数据格式
|
||||
return parseRawDataMessage(actualTopic, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析JSON格式的数据消息
|
||||
*
|
||||
* @param topic 实际主题
|
||||
* @param message JSON消息
|
||||
* @return 标准消息格式
|
||||
*/
|
||||
private IotAlinkMessage parseJsonDataMessage(String topic, String message) {
|
||||
JSONObject json = JSONUtil.parseObj(message);
|
||||
|
||||
// 生成消息ID
|
||||
String messageId = json.getStr(IotHttpConstants.MessageField.ID);
|
||||
if (StrUtil.isBlank(messageId)) {
|
||||
messageId = generateMessageId();
|
||||
}
|
||||
|
||||
// 获取方法名
|
||||
String method = json.getStr(IotHttpConstants.MessageField.METHOD);
|
||||
if (StrUtil.isBlank(method)) {
|
||||
// 根据主题推断方法名
|
||||
method = inferMethodFromTopic(topic);
|
||||
}
|
||||
|
||||
// 获取参数
|
||||
Object params = json.get(IotHttpConstants.MessageField.PARAMS);
|
||||
Map<String, Object> paramsMap = new HashMap<>();
|
||||
if (params instanceof Map) {
|
||||
paramsMap.putAll((Map<String, Object>) params);
|
||||
} else if (params != null) {
|
||||
paramsMap.put(IotHttpConstants.MessageField.DATA, params);
|
||||
}
|
||||
|
||||
return IotAlinkMessage.builder()
|
||||
.id(messageId)
|
||||
.method(method)
|
||||
.version(json.getStr(IotHttpConstants.MessageField.VERSION,
|
||||
IotHttpConstants.DefaultValue.MESSAGE_VERSION))
|
||||
.params(paramsMap)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析原始数据消息
|
||||
*
|
||||
* @param topic 实际主题
|
||||
* @param message 原始消息
|
||||
* @return 标准消息格式
|
||||
*/
|
||||
private IotAlinkMessage parseRawDataMessage(String topic, String message) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(IotHttpConstants.MessageField.DATA, message);
|
||||
|
||||
return IotAlinkMessage.builder()
|
||||
.id(generateMessageId())
|
||||
.method(inferMethodFromTopic(topic))
|
||||
.version(IotHttpConstants.DefaultValue.MESSAGE_VERSION)
|
||||
.params(params)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据主题推断方法名
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 方法名
|
||||
*/
|
||||
private String inferMethodFromTopic(String topic) {
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
return IotHttpConstants.DefaultValue.UNKNOWN_METHOD;
|
||||
}
|
||||
|
||||
// 标准系统主题解析
|
||||
if (topic.startsWith(IotTopicConstants.SYS_TOPIC_PREFIX)) {
|
||||
if (topic.contains(IotTopicConstants.PROPERTY_SET_TOPIC)) {
|
||||
return IotTopicConstants.Method.PROPERTY_SET;
|
||||
} else if (topic.contains(IotTopicConstants.PROPERTY_GET_TOPIC)) {
|
||||
return IotTopicConstants.Method.PROPERTY_GET;
|
||||
} else if (topic.contains(IotTopicConstants.PROPERTY_POST_TOPIC)) {
|
||||
return IotTopicConstants.Method.PROPERTY_POST;
|
||||
} else if (topic.contains(IotTopicConstants.EVENT_POST_TOPIC_PREFIX)
|
||||
&& topic.endsWith(IotTopicConstants.EVENT_POST_TOPIC_SUFFIX)) {
|
||||
// 自定义事件上报
|
||||
String[] parts = topic.split("/");
|
||||
// 查找event关键字的位置
|
||||
for (int i = 0; i < parts.length; i++) {
|
||||
if (IotTopicConstants.Keyword.EVENT.equals(parts[i]) && i + 1 < parts.length) {
|
||||
String eventId = parts[i + 1];
|
||||
return IotTopicConstants.MethodPrefix.THING_EVENT + eventId + ".post";
|
||||
}
|
||||
}
|
||||
} else if (topic.contains(IotTopicConstants.SERVICE_TOPIC_PREFIX)
|
||||
&& !topic.contains(IotTopicConstants.Keyword.PROPERTY)) {
|
||||
// 自定义服务调用
|
||||
String[] parts = topic.split("/");
|
||||
// 查找service关键字的位置
|
||||
for (int i = 0; i < parts.length; i++) {
|
||||
if (IotTopicConstants.Keyword.SERVICE.equals(parts[i]) && i + 1 < parts.length) {
|
||||
String serviceId = parts[i + 1];
|
||||
return IotTopicConstants.MethodPrefix.THING_SERVICE + serviceId;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 自定义主题
|
||||
return IotHttpConstants.Method.CUSTOM_MESSAGE;
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成消息ID
|
||||
*
|
||||
* @return 消息ID
|
||||
*/
|
||||
private String generateMessageId() {
|
||||
return IotAlinkMessage.generateRequestId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] formatResponse(IotStandardResponse response) {
|
||||
try {
|
||||
JSONObject httpResponse = new JSONObject();
|
||||
|
||||
// 判断是否为认证响应
|
||||
if (IotHttpConstants.Method.DEVICE_AUTH.equals(response.getMethod())) {
|
||||
// 认证响应格式
|
||||
httpResponse.set(IotHttpConstants.ResponseField.CODE, response.getCode());
|
||||
httpResponse.set(IotHttpConstants.ResponseField.MESSAGE, response.getMessage());
|
||||
|
||||
if (response.getCode() == 200 && response.getData() != null) {
|
||||
JSONObject info = new JSONObject();
|
||||
if (response.getData() instanceof Map) {
|
||||
Map<String, Object> dataMap = (Map<String, Object>) response.getData();
|
||||
info.putAll(dataMap);
|
||||
} else {
|
||||
info.set(IotHttpConstants.ResponseField.TOKEN, response.getData().toString());
|
||||
}
|
||||
httpResponse.set(IotHttpConstants.ResponseField.INFO, info);
|
||||
}
|
||||
} else {
|
||||
// 数据上报响应格式
|
||||
httpResponse.set(IotHttpConstants.ResponseField.CODE, response.getCode());
|
||||
httpResponse.set(IotHttpConstants.ResponseField.MESSAGE, response.getMessage());
|
||||
|
||||
if (response.getCode() == 200) {
|
||||
JSONObject info = new JSONObject();
|
||||
info.set(IotHttpConstants.ResponseField.MESSAGE_ID, response.getId());
|
||||
httpResponse.set(IotHttpConstants.ResponseField.INFO, info);
|
||||
}
|
||||
}
|
||||
|
||||
String json = httpResponse.toString();
|
||||
return json.getBytes(StandardCharsets.UTF_8);
|
||||
} catch (Exception e) {
|
||||
log.error(IotLogConstants.Http.FORMAT_RESPONSE_FAILED, e);
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandle(String topic) {
|
||||
// 支持认证路径和主题路径
|
||||
return topic != null && (AUTH_PATH.equals(topic) || topic.startsWith(TOPIC_PATH_PREFIX));
|
||||
}
|
||||
|
||||
/**
|
||||
* 从设备标识中解析产品Key和设备名称
|
||||
*
|
||||
* @param deviceKey 设备标识,格式:productKey/deviceName
|
||||
* @return 包含产品Key和设备名称的数组,[0]为产品Key,[1]为设备名称
|
||||
*/
|
||||
public static String[] parseDeviceKey(String deviceKey) {
|
||||
if (StrUtil.isBlank(deviceKey)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String[] parts = deviceKey.split("/");
|
||||
if (parts.length != 2) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new String[]{parts[0], parts[1]};
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备标识
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @return 设备标识,格式:productKey/deviceName
|
||||
*/
|
||||
public static String buildDeviceKey(String productKey, String deviceName) {
|
||||
if (StrUtil.isBlank(productKey) || StrUtil.isBlank(deviceName)) {
|
||||
return null;
|
||||
}
|
||||
return productKey + "/" + deviceName;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,279 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.util;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
|
||||
/**
|
||||
* IoT HTTP 协议主题工具类
|
||||
* <p>
|
||||
* 参考阿里云IoT平台HTTPS协议标准,支持以下路径格式:
|
||||
* 1. 设备认证:/auth
|
||||
* 2. 数据上报:/topic/${actualTopic}
|
||||
* <p>
|
||||
* 其中 actualTopic 遵循MQTT主题规范,例如:
|
||||
* - /sys/{productKey}/{deviceName}/thing/service/property/set
|
||||
* - /{productKey}/{deviceName}/user/get
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
public class IotHttpTopicUtils {
|
||||
|
||||
/**
|
||||
* 设备认证路径
|
||||
*/
|
||||
public static final String AUTH_PATH = "/auth";
|
||||
|
||||
/**
|
||||
* 数据上报路径前缀
|
||||
*/
|
||||
public static final String TOPIC_PATH_PREFIX = "/topic";
|
||||
|
||||
/**
|
||||
* 系统主题前缀
|
||||
*/
|
||||
public static final String SYS_TOPIC_PREFIX = "/sys";
|
||||
|
||||
/**
|
||||
* 构建设备认证路径
|
||||
*
|
||||
* @return 认证路径
|
||||
*/
|
||||
public static String buildAuthPath() {
|
||||
return AUTH_PATH;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建数据上报路径
|
||||
*
|
||||
* @param actualTopic 实际的MQTT主题
|
||||
* @return HTTP数据上报路径
|
||||
*/
|
||||
public static String buildTopicPath(String actualTopic) {
|
||||
if (StrUtil.isBlank(actualTopic)) {
|
||||
return null;
|
||||
}
|
||||
return TOPIC_PATH_PREFIX + actualTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统属性设置路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildPropertySetPath(String productKey, String deviceName) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = SYS_TOPIC_PREFIX + "/" + productKey + "/" + deviceName + "/thing/service/property/set";
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统属性获取路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildPropertyGetPath(String productKey, String deviceName) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = SYS_TOPIC_PREFIX + "/" + productKey + "/" + deviceName + "/thing/service/property/get";
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统属性上报路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildPropertyPostPath(String productKey, String deviceName) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = SYS_TOPIC_PREFIX + "/" + productKey + "/" + deviceName + "/thing/event/property/post";
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统事件上报路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @param eventIdentifier 事件标识符
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildEventPostPath(String productKey, String deviceName, String eventIdentifier) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName, eventIdentifier)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = SYS_TOPIC_PREFIX + "/" + productKey + "/" + deviceName + "/thing/event/" + eventIdentifier
|
||||
+ "/post";
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统服务调用路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @param serviceIdentifier 服务标识符
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildServiceInvokePath(String productKey, String deviceName, String serviceIdentifier) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName, serviceIdentifier)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = SYS_TOPIC_PREFIX + "/" + productKey + "/" + deviceName + "/thing/service/"
|
||||
+ serviceIdentifier;
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建自定义主题路径
|
||||
*
|
||||
* @param productKey 产品Key
|
||||
* @param deviceName 设备名称
|
||||
* @param customPath 自定义路径
|
||||
* @return HTTP路径
|
||||
*/
|
||||
public static String buildCustomTopicPath(String productKey, String deviceName, String customPath) {
|
||||
if (StrUtil.hasBlank(productKey, deviceName, customPath)) {
|
||||
return null;
|
||||
}
|
||||
String actualTopic = "/" + productKey + "/" + deviceName + "/" + customPath;
|
||||
return buildTopicPath(actualTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从HTTP路径中提取实际主题
|
||||
*
|
||||
* @param httpPath HTTP路径,格式:/topic/${actualTopic}
|
||||
* @return 实际主题,如果解析失败返回null
|
||||
*/
|
||||
public static String extractActualTopic(String httpPath) {
|
||||
if (StrUtil.isBlank(httpPath) || !httpPath.startsWith(TOPIC_PATH_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
return httpPath.substring(TOPIC_PATH_PREFIX.length()); // 直接移除/topic前缀
|
||||
}
|
||||
|
||||
/**
|
||||
* 从主题中解析产品Key
|
||||
*
|
||||
* @param topic 主题,支持系统主题和自定义主题
|
||||
* @return 产品Key,如果无法解析则返回null
|
||||
*/
|
||||
public static String parseProductKeyFromTopic(String topic) {
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String[] parts = topic.split("/");
|
||||
|
||||
// 系统主题格式:/sys/{productKey}/{deviceName}/...
|
||||
if (parts.length >= 4 && "sys".equals(parts[1])) {
|
||||
return parts[2];
|
||||
}
|
||||
|
||||
// 自定义主题格式:/{productKey}/{deviceName}/...
|
||||
// 确保不是不完整的系统主题格式
|
||||
if (parts.length >= 3 && StrUtil.isNotBlank(parts[1]) && !"sys".equals(parts[1])) {
|
||||
return parts[1];
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从主题中解析设备名称
|
||||
*
|
||||
* @param topic 主题,支持系统主题和自定义主题
|
||||
* @return 设备名称,如果无法解析则返回null
|
||||
*/
|
||||
public static String parseDeviceNameFromTopic(String topic) {
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String[] parts = topic.split("/");
|
||||
|
||||
// 系统主题格式:/sys/{productKey}/{deviceName}/...
|
||||
if (parts.length >= 4 && "sys".equals(parts[1])) {
|
||||
return parts[3];
|
||||
}
|
||||
|
||||
// 自定义主题格式:/{productKey}/{deviceName}/...
|
||||
// 确保不是不完整的系统主题格式
|
||||
if (parts.length >= 3 && StrUtil.isNotBlank(parts[2]) && !"sys".equals(parts[1])) {
|
||||
return parts[2];
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否为认证路径
|
||||
*
|
||||
* @param path 路径
|
||||
* @return 如果是认证路径返回true,否则返回false
|
||||
*/
|
||||
public static boolean isAuthPath(String path) {
|
||||
return AUTH_PATH.equals(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否为数据上报路径
|
||||
*
|
||||
* @param path 路径
|
||||
* @return 如果是数据上报路径返回true,否则返回false
|
||||
*/
|
||||
public static boolean isTopicPath(String path) {
|
||||
return path != null && path.startsWith(TOPIC_PATH_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否为有效的HTTP路径
|
||||
*
|
||||
* @param path 路径
|
||||
* @return 如果是有效的HTTP路径返回true,否则返回false
|
||||
*/
|
||||
public static boolean isValidHttpPath(String path) {
|
||||
return isAuthPath(path) || isTopicPath(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否为系统主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 如果是系统主题返回true,否则返回false
|
||||
*/
|
||||
public static boolean isSystemTopic(String topic) {
|
||||
return topic != null && topic.startsWith(SYS_TOPIC_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建响应主题路径
|
||||
*
|
||||
* @param requestPath 请求路径
|
||||
* @return 响应路径,如果无法构建返回null
|
||||
*/
|
||||
public static String buildReplyPath(String requestPath) {
|
||||
String actualTopic = extractActualTopic(requestPath);
|
||||
if (actualTopic == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 为系统主题添加_reply后缀
|
||||
if (isSystemTopic(actualTopic)) {
|
||||
String replyTopic = actualTopic + "_reply";
|
||||
return buildTopicPath(replyTopic);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,237 @@
|
||||
package cn.iocoder.yudao.module.iot.protocol.util;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.iot.protocol.constants.IotTopicConstants;
|
||||
import cn.iocoder.yudao.module.iot.protocol.enums.IotMessageDirectionEnum;
|
||||
import cn.iocoder.yudao.module.iot.protocol.enums.IotMessageTypeEnum;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* IoT 主题解析器
|
||||
* <p>
|
||||
* 用于解析各种格式的 IoT 主题,提取其中的关键信息
|
||||
*
|
||||
* @author haohao
|
||||
*/
|
||||
public class IotTopicParser {
|
||||
|
||||
/**
|
||||
* 主题解析结果
|
||||
*/
|
||||
@Data
|
||||
public static class TopicInfo {
|
||||
/**
|
||||
* 产品Key
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private IotMessageTypeEnum messageType;
|
||||
|
||||
/**
|
||||
* 消息方向
|
||||
*/
|
||||
private IotMessageDirectionEnum direction;
|
||||
|
||||
/**
|
||||
* 服务标识符(仅服务调用时有效)
|
||||
*/
|
||||
private String serviceIdentifier;
|
||||
|
||||
/**
|
||||
* 事件标识符(仅事件上报时有效)
|
||||
*/
|
||||
private String eventIdentifier;
|
||||
|
||||
/**
|
||||
* 是否为响应主题
|
||||
*/
|
||||
private boolean isReply;
|
||||
|
||||
/**
|
||||
* 原始主题
|
||||
*/
|
||||
private String originalTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析主题
|
||||
*
|
||||
* @param topic 主题字符串
|
||||
* @return 解析结果,如果解析失败返回 null
|
||||
*/
|
||||
public static TopicInfo parse(String topic) {
|
||||
if (StrUtil.isBlank(topic)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TopicInfo info = new TopicInfo();
|
||||
info.setOriginalTopic(topic);
|
||||
|
||||
// 检查是否为响应主题
|
||||
boolean isReply = topic.endsWith(IotTopicConstants.REPLY_SUFFIX);
|
||||
info.setReply(isReply);
|
||||
|
||||
// 移除响应后缀,便于后续解析
|
||||
String normalizedTopic = isReply ? topic.substring(0, topic.length() - IotTopicConstants.REPLY_SUFFIX.length())
|
||||
: topic;
|
||||
|
||||
// 解析系统主题
|
||||
if (normalizedTopic.startsWith(IotTopicConstants.SYS_TOPIC_PREFIX)) {
|
||||
return parseSystemTopic(info, normalizedTopic);
|
||||
}
|
||||
|
||||
// 解析自定义主题
|
||||
return parseCustomTopic(info, normalizedTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析系统主题
|
||||
* 格式:/sys/{productKey}/{deviceName}/thing/service/{identifier}
|
||||
* 或:/sys/{productKey}/{deviceName}/thing/event/{identifier}/post
|
||||
*/
|
||||
private static TopicInfo parseSystemTopic(TopicInfo info, String topic) {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length < 6) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 解析产品Key和设备名称
|
||||
info.setProductKey(parts[2]);
|
||||
info.setDeviceName(parts[3]);
|
||||
|
||||
// 判断消息方向:包含 /post 通常是上行,其他是下行
|
||||
info.setDirection(topic.contains("/post") || topic.contains("/reply") ? IotMessageDirectionEnum.UPSTREAM
|
||||
: IotMessageDirectionEnum.DOWNSTREAM);
|
||||
|
||||
// 解析具体的消息类型
|
||||
if (topic.contains("/thing/service/")) {
|
||||
return parseServiceTopic(info, topic, parts);
|
||||
} else if (topic.contains("/thing/event/")) {
|
||||
return parseEventTopic(info, topic, parts);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析服务相关主题
|
||||
*/
|
||||
private static TopicInfo parseServiceTopic(TopicInfo info, String topic, String[] parts) {
|
||||
// 查找 service 关键字的位置
|
||||
int serviceIndex = -1;
|
||||
for (int i = 0; i < parts.length; i++) {
|
||||
if ("service".equals(parts[i])) {
|
||||
serviceIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (serviceIndex == -1 || serviceIndex + 1 >= parts.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String serviceType = parts[serviceIndex + 1];
|
||||
|
||||
// 根据服务类型确定消息类型
|
||||
switch (serviceType) {
|
||||
case "property":
|
||||
if (serviceIndex + 2 < parts.length) {
|
||||
String operation = parts[serviceIndex + 2];
|
||||
if ("set".equals(operation)) {
|
||||
info.setMessageType(IotMessageTypeEnum.PROPERTY_SET);
|
||||
} else if ("get".equals(operation)) {
|
||||
info.setMessageType(IotMessageTypeEnum.PROPERTY_GET);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "config":
|
||||
if (serviceIndex + 2 < parts.length && "set".equals(parts[serviceIndex + 2])) {
|
||||
info.setMessageType(IotMessageTypeEnum.CONFIG_SET);
|
||||
}
|
||||
break;
|
||||
case "ota":
|
||||
if (serviceIndex + 2 < parts.length && "upgrade".equals(parts[serviceIndex + 2])) {
|
||||
info.setMessageType(IotMessageTypeEnum.OTA_UPGRADE);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// 自定义服务
|
||||
info.setMessageType(IotMessageTypeEnum.SERVICE_INVOKE);
|
||||
info.setServiceIdentifier(serviceType);
|
||||
break;
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析事件相关主题
|
||||
*/
|
||||
private static TopicInfo parseEventTopic(TopicInfo info, String topic, String[] parts) {
|
||||
// 查找 event 关键字的位置
|
||||
int eventIndex = -1;
|
||||
for (int i = 0; i < parts.length; i++) {
|
||||
if ("event".equals(parts[i])) {
|
||||
eventIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (eventIndex == -1 || eventIndex + 1 >= parts.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String eventType = parts[eventIndex + 1];
|
||||
|
||||
if ("property".equals(eventType) && eventIndex + 2 < parts.length && "post".equals(parts[eventIndex + 2])) {
|
||||
info.setMessageType(IotMessageTypeEnum.PROPERTY_POST);
|
||||
} else {
|
||||
// 自定义事件
|
||||
info.setMessageType(IotMessageTypeEnum.EVENT_POST);
|
||||
info.setEventIdentifier(eventType);
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析自定义主题
|
||||
* 这里可以根据实际需求扩展自定义主题的解析逻辑
|
||||
*/
|
||||
private static TopicInfo parseCustomTopic(TopicInfo info, String topic) {
|
||||
// TODO: 根据业务需要实现自定义主题解析逻辑
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查主题是否为有效的系统主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 如果是有效的系统主题返回 true,否则返回 false
|
||||
*/
|
||||
public static boolean isValidSystemTopic(String topic) {
|
||||
TopicInfo info = parse(topic);
|
||||
return info != null &&
|
||||
StrUtil.isNotBlank(info.getProductKey()) &&
|
||||
StrUtil.isNotBlank(info.getDeviceName()) &&
|
||||
info.getMessageType() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查主题是否为响应主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return 如果是响应主题返回 true,否则返回 false
|
||||
*/
|
||||
public static boolean isReplyTopic(String topic) {
|
||||
return topic != null && topic.endsWith(IotTopicConstants.REPLY_SUFFIX);
|
||||
}
|
||||
}
|
||||
@@ -126,12 +126,12 @@ public class IotTopicUtils {
|
||||
if (StrUtil.isBlank(topic) || !topic.startsWith(IotTopicConstants.SYS_TOPIC_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length < 4) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
return parts[2];
|
||||
}
|
||||
|
||||
@@ -146,12 +146,12 @@ public class IotTopicUtils {
|
||||
if (StrUtil.isBlank(topic) || !topic.startsWith(IotTopicConstants.SYS_TOPIC_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length < 4) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
return parts[3];
|
||||
}
|
||||
|
||||
@@ -166,19 +166,19 @@ public class IotTopicUtils {
|
||||
if (StrUtil.isBlank(topic) || !topic.startsWith(IotTopicConstants.SYS_TOPIC_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// 服务调用主题
|
||||
if (topic.contains("/thing/service/")) {
|
||||
String servicePart = topic.substring(topic.indexOf("/thing/service/") + "/thing/service/".length());
|
||||
return servicePart.replace("/", ".");
|
||||
}
|
||||
|
||||
|
||||
// 事件上报主题
|
||||
if (topic.contains("/thing/event/")) {
|
||||
String eventPart = topic.substring(topic.indexOf("/thing/event/") + "/thing/event/".length());
|
||||
return "event." + eventPart.replace("/", ".");
|
||||
}
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user