reactor:【IoT 物联网】重新梳理下行消息的逻辑(未测试,用于相互 review 作用)

This commit is contained in:
YunaiV
2025-06-11 09:56:59 +08:00
parent 4ea6e08f99
commit 66b42367cb
48 changed files with 734 additions and 1536 deletions

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.codec.alink;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
@@ -48,18 +49,23 @@ public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 响应提示
*
* 特殊:这里阿里云是 message为了保持和项目的 {@link CommonResult#getMsg()} 一致。
*/
private String msg;
}
@Override
public byte[] encode(IotDeviceMessage message) {
AlinkMessage alinkMessage = new AlinkMessage(message.getRequestId(), AlinkMessage.VERSION_1,
message.getMethod(), message.getParams(), message.getData(), message.getCode());
message.getMethod(), message.getParams(), message.getData(), message.getCode(), message.getMsg());
return JsonUtils.toJsonByte(alinkMessage);
}
@@ -69,8 +75,8 @@ public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
AlinkMessage alinkMessage = JsonUtils.parseObject(bytes, AlinkMessage.class);
Assert.notNull(alinkMessage, "消息不能为空");
Assert.equals(alinkMessage.getVersion(), AlinkMessage.VERSION_1, "消息版本号必须是 1.0");
return IotDeviceMessage.of(alinkMessage.getId(),
alinkMessage.getMethod(), alinkMessage.getParams(), alinkMessage.getData(), alinkMessage.getCode());
return IotDeviceMessage.of(alinkMessage.getId(), alinkMessage.getMethod(), alinkMessage.getParams(),
alinkMessage.getData(), alinkMessage.getCode(), alinkMessage.getMsg());
}
@Override

View File

@@ -3,8 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -44,16 +42,17 @@ public class IotGatewayConfiguration {
@Slf4j
public static class MqttProtocolConfiguration {
@Bean
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
}
@Bean
public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
IotMessageBus messageBus) {
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
}
// TODO @haohao临时注释避免报错
// @Bean
// public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties) {
// return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
// }
//
// @Bean
// public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
// IotMessageBus messageBus) {
// return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, messageBus);
// }
}
}

View File

@@ -1,24 +1,18 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT 网关 HTTP 协议的【上行】处理器
*
@@ -61,135 +55,4 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
return CommonResult.success(MapUtil.of("messageId", message.getId()));
}
/**
* 判断是否是属性上报路径
*
* @param path 路径
* @return 是否是属性上报路径
*/
private boolean isPropertyPostPath(String path) {
return StrUtil.endWith(path, IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic());
}
/**
* 判断是否是事件上报路径
*
* @param path 路径
* @return 是否是事件上报路径
*/
private boolean isEventPostPath(String path) {
return StrUtil.contains(path, IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic())
&& StrUtil.endWith(path, IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic());
}
/**
* 处理属性上报请求
*
* @param routingContext 路由上下文
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param body 请求体
*/
private void handlePropertyPost(RoutingContext routingContext, String productKey, String deviceName,
JsonObject body) {
// 1.1 构建设备消息
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId())
// .ofPropertyReport(parsePropertiesFromBody(body))
;
// 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);
// // 2. 返回响应
// sendResponse(routingContext, null);
}
/**
* 处理事件上报请求
*
* @param routingContext 路由上下文
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param body 请求体
*/
private void handleEventPost(RoutingContext routingContext, String productKey, String deviceName,
String identifier, JsonObject body) {
// // 处理事件上报
// IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
// requestId, body);
//
// // 事件上报
// CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
}
// TODO @芋艿:这块在看看
/**
* 从请求体解析属性
*
* @param body 请求体
* @return 属性映射
*/
private Map<String, Object> parsePropertiesFromBody(JsonObject body) {
Map<String, Object> properties = MapUtil.newHashMap();
JsonObject params = body.getJsonObject("params");
if (CollUtil.isEmpty(params)) {
return properties;
}
// 将标准格式的 params 转换为平台需要的 properties 格式
for (String key : params.fieldNames()) {
Object valueObj = params.getValue(key);
// 如果是复杂结构(包含 value 和 time
if (valueObj instanceof JsonObject) {
JsonObject valueJson = (JsonObject) valueObj;
properties.put(key, valueJson.containsKey("value") ? valueJson.getValue("value") : valueObj);
} else {
properties.put(key, valueObj);
}
}
return properties;
}
// /**
// * 解析事件上报请求
// *
// * @param productKey 产品 Key
// * @param deviceName 设备名称
// * @param identifier 事件标识符
// * @param requestId 请求 ID
// * @param body 请求体
// * @return 事件上报请求 DTO
// */
// private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier,
// String requestId, JsonObject body) {
// // 解析参数
// Map<String, Object> params = parseParamsFromBody(body);
//
// // 构建事件上报请求 DTO
// return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
// .setRequestId(requestId)
// .setProcessId(IotNetComponentCommonUtils.getProcessId())
// .setReportTime(LocalDateTime.now())
// .setProductKey(productKey)
// .setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
// }
/**
* 从请求体解析参数
*
* @param body 请求体
* @return 参数映射
*/
private Map<String, Object> parseParamsFromBody(JsonObject body) {
Map<String, Object> params = MapUtil.newHashMap();
JsonObject paramsJson = body.getJsonObject("params");
if (CollUtil.isEmpty(paramsJson)) {
return params;
}
for (String key : paramsJson.fieldNames()) {
params.put(key, paramsJson.getValue(key));
}
return params;
}
}

View File

@@ -96,7 +96,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
return null;
}
IotDeviceMessage message = IotDeviceMessage.of(null,
IotDeviceMessage message = IotDeviceMessage.requestOf(null,
IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(), null);
return appendDeviceMessage(message, deviceInfo, serverId);
@@ -112,9 +112,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
return null;
}
IotDeviceMessage message = IotDeviceMessage.of(null,
IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod(), null);
IotDeviceMessage message = IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod(),
null);
return appendDeviceMessage(message, deviceInfo, serverId);
}
@@ -122,23 +121,18 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
* 补充消息的后端字段
*
* @param message 消息
* @param deviceInfo 设备信息
* @param device 设备信息
* @param serverId 设备连接的 serverId
* @return 消息
*/
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
IotDeviceCacheService.DeviceInfo deviceInfo, String serverId) {
IotDeviceCacheService.DeviceInfo device, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(deviceInfo.getDeviceId()).setTenantId(deviceInfo.getTenantId()).setServerId(serverId);
.setDeviceId(device.getDeviceId()).setTenantId(device.getTenantId()).setServerId(serverId);
// 特殊:如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId());
}
log.debug("[appendDeviceMessage][消息字段补充完成][deviceId: {}][tenantId: {}]",
deviceInfo.getDeviceId(), deviceInfo.getTenantId());
return message;
}