feat:【IOT】新增 MQTT 协议支持及相关消息解析器,完善协议转换器功能

This commit is contained in:
haohao
2025-05-24 17:30:32 +08:00
parent fbb664026d
commit af37176d50
15 changed files with 558 additions and 109 deletions

View File

@@ -4,8 +4,8 @@ import cn.iocoder.yudao.module.iot.protocol.convert.IotProtocolConverter;
import cn.iocoder.yudao.module.iot.protocol.convert.impl.DefaultIotProtocolConverter;
import cn.iocoder.yudao.module.iot.protocol.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotAlinkMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotHttpMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotMqttMessageParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -21,20 +21,21 @@ public class IotProtocolAutoConfiguration {
/**
* Bean 名称常量
*/
public static final String IOT_ALINK_MESSAGE_PARSER_BEAN_NAME = "iotAlinkMessageParser";
public static final String IOT_MQTT_MESSAGE_PARSER_BEAN_NAME = "iotMqttMessageParser";
public static final String IOT_HTTP_MESSAGE_PARSER_BEAN_NAME = "iotHttpMessageParser";
/**
* 注册 Alink 协议消息解析器
* 注册 MQTT 协议消息解析器
*
* @return Alink 协议消息解析器
* @return MQTT 协议消息解析器
*/
@Bean
@ConditionalOnMissingBean(name = IOT_ALINK_MESSAGE_PARSER_BEAN_NAME)
public IotMessageParser iotAlinkMessageParser() {
return new IotAlinkMessageParser();
@ConditionalOnMissingBean(name = IOT_MQTT_MESSAGE_PARSER_BEAN_NAME)
public IotMessageParser iotMqttMessageParser() {
return new IotMqttMessageParser();
}
/**
* 注册 HTTP 协议消息解析器
*
@@ -50,26 +51,24 @@ public class IotProtocolAutoConfiguration {
* 注册默认协议转换器
* <p>
* 如果用户没有自定义协议转换器,则使用默认实现
* 默认会注册 Alink 和 HTTP 协议解析器
* 默认会注册 MQTT 和 HTTP 协议解析器
*
* @param iotAlinkMessageParser Alink 协议解析器
* @param iotHttpMessageParser HTTP 协议解析器
* @param iotMqttMessageParser MQTT 协议解析器
* @param iotHttpMessageParser HTTP 协议解析器
* @return 默认协议转换器
*/
@Bean
@ConditionalOnMissingBean
public IotProtocolConverter iotProtocolConverter(IotMessageParser iotAlinkMessageParser,
public IotProtocolConverter iotProtocolConverter(IotMessageParser iotMqttMessageParser,
IotMessageParser iotHttpMessageParser) {
DefaultIotProtocolConverter converter = new DefaultIotProtocolConverter();
// 注册 MQTT 协议解析器(默认实现)
converter.registerParser(IotProtocolTypeEnum.MQTT.getCode(), iotMqttMessageParser);
// 注册 HTTP 协议解析器
converter.registerParser(IotProtocolTypeEnum.HTTP.getCode(), iotHttpMessageParser);
// 注意Alink 协议解析器已经在 DefaultIotProtocolConverter 构造函数中注册
// 如果需要使用自定义的 Alink 解析器实例,可以重新注册
// converter.registerParser(IotProtocolTypeEnum.ALINK.getCode(),
// iotAlinkMessageParser);
return converter;
}
}

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.protocol.convert;
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotMqttMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
/**
@@ -20,7 +20,7 @@ public interface IotProtocolConverter {
* @param protocol 协议类型
* @return 标准消息对象,转换失败返回 null
*/
IotAlinkMessage convertToStandardMessage(String topic, byte[] payload, String protocol);
IotMqttMessage convertToStandardMessage(String topic, byte[] payload, String protocol);
/**
* 将标准响应转换为字节数组

View File

@@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.iot.protocol.convert.impl;
import cn.iocoder.yudao.module.iot.protocol.constants.IotLogConstants;
import cn.iocoder.yudao.module.iot.protocol.convert.IotProtocolConverter;
import cn.iocoder.yudao.module.iot.protocol.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.IotMqttMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotAlinkMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.impl.IotMqttMessageParser;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
@@ -33,8 +33,9 @@ public class DefaultIotProtocolConverter implements IotProtocolConverter {
* 构造函数,初始化默认支持的协议
*/
public DefaultIotProtocolConverter() {
// 注册 Alink 协议解析器
registerParser(IotProtocolTypeEnum.ALINK.getCode(), new IotAlinkMessageParser());
// 注册 MQTT 协议解析器作为默认实现
IotMqttMessageParser mqttParser = new IotMqttMessageParser();
registerParser(IotProtocolTypeEnum.MQTT.getCode(), mqttParser);
}
/**
@@ -59,7 +60,7 @@ public class DefaultIotProtocolConverter implements IotProtocolConverter {
}
@Override
public IotAlinkMessage convertToStandardMessage(String topic, byte[] payload, String protocol) {
public IotMqttMessage convertToStandardMessage(String topic, byte[] payload, String protocol) {
IotMessageParser parser = parsers.get(protocol);
if (parser == null) {
log.warn(IotLogConstants.Converter.UNSUPPORTED_PROTOCOL, protocol);
@@ -108,13 +109,13 @@ public class DefaultIotProtocolConverter implements IotProtocolConverter {
* @param payload 消息负载
* @return 解析后的标准消息,如果无法解析返回 null
*/
public IotAlinkMessage autoConvert(String topic, byte[] payload) {
public IotMqttMessage autoConvert(String topic, byte[] payload) {
// 遍历所有解析器,找到能处理该主题的解析器
for (Map.Entry<String, IotMessageParser> entry : parsers.entrySet()) {
IotMessageParser parser = entry.getValue();
if (parser.canHandle(topic)) {
try {
IotAlinkMessage message = parser.parse(topic, payload);
IotMqttMessage message = parser.parse(topic, payload);
if (message != null) {
log.debug(IotLogConstants.Converter.AUTO_SELECT_PROTOCOL, entry.getKey(), topic);
return message;

View File

@@ -13,9 +13,9 @@ import lombok.Getter;
public enum IotProtocolTypeEnum {
/**
* Alink 协议(阿里云物联网协议
* MQTT 协议(默认实现
*/
ALINK("alink", "Alink 协议"),
MQTT("mqtt", "MQTT 协议"),
/**
* MQTT 原始协议

View File

@@ -16,7 +16,7 @@ public interface IotMessageParser {
* @param payload 消息负载
* @return 解析后的标准消息,如果解析失败返回 null
*/
IotAlinkMessage parse(String topic, byte[] payload);
IotMqttMessage parse(String topic, byte[] payload);
/**
* 格式化响应消息

View File

@@ -8,16 +8,16 @@ import lombok.Data;
import java.util.Map;
/**
* IoT Alink 消息模型
* IoT MQTT 消息模型
* <p>
* 基于阿里云 Alink 协议规范实现的标准消息格式
* 基于 MQTT 协议规范实现的标准消息格式支持设备属性事件服务调用等标准功能
*
* @author haohao
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/alink-protocol-1">阿里云物联网 Alink 协议</a>
* @see <a href="https://mqtt.org/">MQTT 协议官方规范</a>
*/
@Data
@Builder
public class IotAlinkMessage {
public class IotMqttMessage {
/**
* 消息 ID
@@ -69,11 +69,11 @@ public class IotAlinkMessage {
* @param requestId 请求 ID为空时自动生成
* @param serviceIdentifier 服务标识符
* @param params 服务参数
* @return Alink 消息对象
* @return MQTT 消息对象
*/
public static IotAlinkMessage createServiceInvokeMessage(String requestId, String serviceIdentifier,
Map<String, Object> params) {
return IotAlinkMessage.builder()
public static IotMqttMessage createServiceInvokeMessage(String requestId, String serviceIdentifier,
Map<String, Object> params) {
return IotMqttMessage.builder()
.id(requestId != null ? requestId : generateRequestId())
.method("thing.service." + serviceIdentifier)
.params(params)
@@ -85,10 +85,10 @@ public class IotAlinkMessage {
*
* @param requestId 请求 ID为空时自动生成
* @param properties 设备属性
* @return Alink 消息对象
* @return MQTT 消息对象
*/
public static IotAlinkMessage createPropertySetMessage(String requestId, Map<String, Object> properties) {
return IotAlinkMessage.builder()
public static IotMqttMessage createPropertySetMessage(String requestId, Map<String, Object> properties) {
return IotMqttMessage.builder()
.id(requestId != null ? requestId : generateRequestId())
.method("thing.service.property.set")
.params(properties)
@@ -100,13 +100,13 @@ public class IotAlinkMessage {
*
* @param requestId 请求 ID为空时自动生成
* @param identifiers 要获取的属性标识符列表
* @return Alink 消息对象
* @return MQTT 消息对象
*/
public static IotAlinkMessage createPropertyGetMessage(String requestId, String[] identifiers) {
public static IotMqttMessage createPropertyGetMessage(String requestId, String[] identifiers) {
JSONObject params = new JSONObject();
params.set("identifiers", identifiers);
return IotAlinkMessage.builder()
return IotMqttMessage.builder()
.id(requestId != null ? requestId : generateRequestId())
.method("thing.service.property.get")
.params(params)
@@ -118,10 +118,10 @@ public class IotAlinkMessage {
*
* @param requestId 请求 ID为空时自动生成
* @param configs 设备配置
* @return Alink 消息对象
* @return MQTT 消息对象
*/
public static IotAlinkMessage createConfigSetMessage(String requestId, Map<String, Object> configs) {
return IotAlinkMessage.builder()
public static IotMqttMessage createConfigSetMessage(String requestId, Map<String, Object> configs) {
return IotMqttMessage.builder()
.id(requestId != null ? requestId : generateRequestId())
.method("thing.service.config.set")
.params(configs)
@@ -133,10 +133,10 @@ public class IotAlinkMessage {
*
* @param requestId 请求 ID为空时自动生成
* @param otaInfo OTA 升级信息
* @return Alink 消息对象
* @return MQTT 消息对象
*/
public static IotAlinkMessage createOtaUpgradeMessage(String requestId, Map<String, Object> otaInfo) {
return IotAlinkMessage.builder()
public static IotMqttMessage createOtaUpgradeMessage(String requestId, Map<String, Object> otaInfo) {
return IotMqttMessage.builder()
.id(requestId != null ? requestId : generateRequestId())
.method("thing.service.ota.upgrade")
.params(otaInfo)

View File

@@ -6,8 +6,8 @@ import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.protocol.constants.IotHttpConstants;
import cn.iocoder.yudao.module.iot.protocol.constants.IotLogConstants;
import cn.iocoder.yudao.module.iot.protocol.constants.IotTopicConstants;
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.IotMqttMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
import lombok.extern.slf4j.Slf4j;
@@ -61,7 +61,7 @@ public class IotHttpMessageParser implements IotMessageParser {
public static final String TOPIC_PATH_PREFIX = IotHttpConstants.Path.TOPIC_PREFIX;
@Override
public IotAlinkMessage parse(String topic, byte[] payload) {
public IotMqttMessage parse(String topic, byte[] payload) {
if (payload == null || payload.length == 0) {
log.warn(IotLogConstants.Http.RECEIVED_EMPTY_MESSAGE, topic);
return null;
@@ -92,7 +92,7 @@ public class IotHttpMessageParser implements IotMessageParser {
* @param message 认证消息JSON
* @return 标准消息格式
*/
private IotAlinkMessage parseAuthMessage(String message) {
private IotMqttMessage parseAuthMessage(String message) {
if (!JSONUtil.isTypeJSON(message)) {
log.warn(IotLogConstants.Http.AUTH_MESSAGE_NOT_JSON, message);
return null;
@@ -121,7 +121,7 @@ public class IotHttpMessageParser implements IotMessageParser {
params.put(IotHttpConstants.AuthField.SIGN_METHOD,
json.getStr(IotHttpConstants.AuthField.SIGN_METHOD, IotHttpConstants.DefaultValue.SIGN_METHOD));
return IotAlinkMessage.builder()
return IotMqttMessage.builder()
.id(generateMessageId())
.method(IotHttpConstants.Method.DEVICE_AUTH)
.version(json.getStr(IotHttpConstants.AuthField.VERSION, IotHttpConstants.DefaultValue.VERSION))
@@ -136,7 +136,7 @@ public class IotHttpMessageParser implements IotMessageParser {
* @param message 消息内容
* @return 标准消息格式
*/
private IotAlinkMessage parseDataMessage(String topic, String message) {
private IotMqttMessage parseDataMessage(String topic, String message) {
// 提取实际的主题,去掉 /topic 前缀
String actualTopic = topic.substring(TOPIC_PATH_PREFIX.length()); // 直接移除/topic前缀
@@ -156,7 +156,7 @@ public class IotHttpMessageParser implements IotMessageParser {
* @param message JSON消息
* @return 标准消息格式
*/
private IotAlinkMessage parseJsonDataMessage(String topic, String message) {
private IotMqttMessage parseJsonDataMessage(String topic, String message) {
JSONObject json = JSONUtil.parseObj(message);
// 生成消息ID
@@ -181,7 +181,7 @@ public class IotHttpMessageParser implements IotMessageParser {
paramsMap.put(IotHttpConstants.MessageField.DATA, params);
}
return IotAlinkMessage.builder()
return IotMqttMessage.builder()
.id(messageId)
.method(method)
.version(json.getStr(IotHttpConstants.MessageField.VERSION,
@@ -197,11 +197,11 @@ public class IotHttpMessageParser implements IotMessageParser {
* @param message 原始消息
* @return 标准消息格式
*/
private IotAlinkMessage parseRawDataMessage(String topic, String message) {
private IotMqttMessage parseRawDataMessage(String topic, String message) {
Map<String, Object> params = new HashMap<>();
params.put(IotHttpConstants.MessageField.DATA, message);
return IotAlinkMessage.builder()
return IotMqttMessage.builder()
.id(generateMessageId())
.method(inferMethodFromTopic(topic))
.version(IotHttpConstants.DefaultValue.MESSAGE_VERSION)
@@ -263,7 +263,7 @@ public class IotHttpMessageParser implements IotMessageParser {
* @return 消息ID
*/
private String generateMessageId() {
return IotAlinkMessage.generateRequestId();
return IotMqttMessage.generateRequestId();
}
@Override

View File

@@ -4,8 +4,8 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.protocol.message.IotAlinkMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotMessageParser;
import cn.iocoder.yudao.module.iot.protocol.message.IotMqttMessage;
import cn.iocoder.yudao.module.iot.protocol.message.IotStandardResponse;
import cn.iocoder.yudao.module.iot.protocol.util.IotTopicUtils;
import lombok.extern.slf4j.Slf4j;
@@ -14,26 +14,26 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* IoT Alink 协议消息解析器实现
* IoT MQTT 协议消息解析器实现
* <p>
* 基于阿里云 Alink 协议规范实现的消息解析器
* 基于 MQTT 协议规范实现的消息解析器支持设备属性事件服务调用等标准功能
*
* @author haohao
*/
@Slf4j
public class IotAlinkMessageParser implements IotMessageParser {
public class IotMqttMessageParser implements IotMessageParser {
@Override
public IotAlinkMessage parse(String topic, byte[] payload) {
public IotMqttMessage parse(String topic, byte[] payload) {
if (payload == null || payload.length == 0) {
log.warn("[Alink] 收到空消息内容, topic={}", topic);
log.warn("[MQTT] 收到空消息内容, topic={}", topic);
return null;
}
try {
String message = new String(payload, StandardCharsets.UTF_8);
if (!JSONUtil.isTypeJSON(message)) {
log.warn("[Alink] 收到非JSON格式消息, topic={}, message={}", topic, message);
log.warn("[MQTT] 收到非JSON格式消息, topic={}, message={}", topic, message);
return null;
}
@@ -45,20 +45,21 @@ public class IotAlinkMessageParser implements IotMessageParser {
// 尝试从 topic 中解析方法
method = IotTopicUtils.parseMethodFromTopic(topic);
if (StrUtil.isBlank(method)) {
log.warn("[Alink] 无法确定消息方法, topic={}, message={}", topic, message);
log.warn("[MQTT] 无法确定消息方法, topic={}, message={}", topic, message);
return null;
}
}
@SuppressWarnings("unchecked")
Map<String, Object> params = (Map<String, Object>) json.getObj("params", Map.class);
return IotAlinkMessage.builder()
return IotMqttMessage.builder()
.id(id)
.method(method)
.version(json.getStr("version", "1.0"))
.params(params)
.build();
} catch (Exception e) {
log.error("[Alink] 解析消息失败, topic={}", topic, e);
log.error("[MQTT] 解析消息失败, topic={}", topic, e);
return null;
}
}
@@ -69,14 +70,18 @@ public class IotAlinkMessageParser implements IotMessageParser {
String json = JsonUtils.toJsonString(response);
return json.getBytes(StandardCharsets.UTF_8);
} catch (Exception e) {
log.error("[Alink] 格式化响应失败", e);
log.error("[MQTT] 格式化响应失败", e);
return new byte[0];
}
}
@Override
public boolean canHandle(String topic) {
// Alink 协议处理所有系统主题
return topic != null && topic.startsWith("/sys/");
// MQTT 协议支持更多主题格式
return topic != null && (
topic.startsWith("/sys/") || // 兼容现有系统主题
topic.startsWith("/mqtt/") || // 新的通用 MQTT 主题
topic.startsWith("/device/") // 设备主题
);
}
}