reactor:【IoT 物联网】将 http component 合并到 gateway 里

This commit is contained in:
YunaiV
2025-06-01 07:48:30 +08:00
parent 81cbc61f3c
commit c3485a3f3d
46 changed files with 524 additions and 922 deletions

View File

@@ -1,137 +0,0 @@
# IOT 组件使用说明
## 组件介绍
该模块包含多个 IoT 设备连接组件,提供不同的通信协议支持:
- `yudao-module-iot-net-component-core`: 核心接口和通用类
- `yudao-module-iot-net-component-http`: 基于 HTTP 协议的设备通信组件
- `yudao-module-iot-net-component-emqx`: 基于 MQTT/EMQX 的设备通信组件
## 组件架构
### 架构设计
各组件采用统一的架构设计和命名规范:
- 配置类: `IotComponentXxxAutoConfiguration` - 提供Bean定义和组件初始化逻辑
- 属性类: `IotComponentXxxProperties` - 定义组件的配置属性
- 下行接口: `*DownstreamHandler` - 处理从平台到设备的下行通信
- 上行接口: `*UpstreamServer` - 处理从设备到平台的上行通信
### Bean 命名规范
为避免 Bean 冲突,各个组件中的 Bean 已添加特定前缀:
- HTTP 组件: `httpDeviceUpstreamServer`, `httpDeviceDownstreamHandler`
- EMQX 组件: `emqxDeviceUpstreamServer`, `emqxDeviceDownstreamHandler`
### 组件启用规则
现在系统支持同时使用多个组件,但有以下规则:
1.`yudao.iot.component.emqx.enabled=true`核心模块将优先使用EMQX组件
2. 如果同时启用了多个组件,需要在业务代码中使用`@Qualifier`指定要使用的具体实现
> **重要提示:**
> 1. 组件库内部的默认配置文件**不会**被自动加载。必须将上述配置添加到主应用的配置文件中。
> 2. 所有配置项现在都已增加空值处理,配置缺失时将使用合理的默认值
> 3. `mqtt-host` 是唯一必须配置的参数,其他参数均有默认值
> 4. `mqtt-ssl` 和 `auth-port` 缺失时的默认值分别为 `false` 和 `8080`
> 5. `mqtt-topics` 缺失时将使用默认主题 `/device/#`
### 如何引用特定的 Bean
在其他组件中引用这些 Bean 时,需要使用 `@Qualifier` 注解指定 Bean 名称:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
@Service
public class YourServiceClass {
// 注入 HTTP 组件的下行处理器
@Autowired
@Qualifier("httpDeviceDownstreamHandler")
private IotDeviceDownstreamHandler httpDeviceDownstreamHandler;
// 注入 EMQX 组件的下行处理器
@Autowired
@Qualifier("emqxDeviceDownstreamHandler")
private IotDeviceDownstreamHandler emqxDeviceDownstreamHandler;
// 使用示例
public void example() {
// 使用 HTTP 组件
httpDeviceDownstreamHandler.invokeDeviceService(...);
// 使用 EMQX 组件
emqxDeviceDownstreamHandler.invokeDeviceService(...);
}
}
```
### 组件选择指南
- **HTTP 组件**:适用于简单场景,设备通过 HTTP 接口与平台通信
- **EMQX 组件**:适用于实时性要求高的场景,基于 MQTT 协议,支持发布/订阅模式
## 常见问题
### 1. 配置未加载问题
如果遇到以下日志:
```
MQTT配置: host=null, port=null, username=null, ssl=null
[connectMqtt][MQTT Host为null无法连接]
```
这表明配置没有被正确加载。请确保:
1. 在主应用的配置文件中(如 `application.yml``application-dev.yml`)添加了必要的 EMQX 配置
2. 配置前缀正确:`yudao.iot.component.emqx`
3. 配置了必要的 `mqtt-host` 属性
### 2. mqttSsl 空指针异常
如果遇到以下错误:
```
Cannot invoke "java.lang.Boolean.booleanValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getMqttSsl()" is null
```
此问题已通过代码修复,现在会自动使用默认值 `false`。同样适用于其他配置项的空值问题。
### 3. authPort 空指针异常
如果遇到以下错误:
```
Cannot invoke "java.lang.Integer.intValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getAuthPort()" is null
```
此问题已通过代码修复,现在会自动使用默认值 `8080`
### 4. Bean注入问题
如果遇到以下错误:
```
Parameter 1 of method deviceDownstreamServer in IotPluginCommonAutoConfiguration required a single bean, but 2 were found
```
此问题已通过修改核心配置类来解决。现在系统会根据组件的启用状态自动选择合适的实现:
1. 优先使用EMQX组件`yudao.iot.component.emqx.enabled=true`时)
2. 如果EMQX未启用则使用HTTP组件`yudao.iot.component.http.enabled=true`时)
如果需要同时使用两个组件,业务代码中必须使用`@Qualifier`明确指定要使用的Bean。
### 5. 使用默认配置
组件现已加入完善的默认配置和空值处理机制,使配置更加灵活。但需要注意的是,这些默认配置值必须通过在主应用配置文件中设置相应的属性才能生效。
// TODO 芋艿:后续继续完善 README.md

View File

@@ -18,9 +18,8 @@
<modules>
<module>yudao-module-iot-net-component-core</module>
<module>yudao-module-iot-net-component-http</module>
<module>yudao-module-iot-net-component-emqx</module>
<module>yudao-module-iot-net-component-server</module>
</modules>
</project>
</project>

View File

@@ -11,9 +11,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
*
* @author haohao
*/
@AutoConfiguration
@EnableConfigurationProperties(IotNetComponentCommonProperties.class)
@EnableScheduling // 开启定时任务,因为 IotNetComponentInstanceHeartbeatJob 是一个定时任务
public class IotNetComponentCommonAutoConfiguration {
/**

View File

@@ -1,24 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* IoT 网络组件通用配置属性
*
* @author haohao
*/
@ConfigurationProperties(prefix = "yudao.iot.component")
@Validated
@Data
public class IotNetComponentCommonProperties {
/**
* 组件的唯一标识
* <p>
* 注意:该值将在运行时由各组件设置,不再从配置读取
*/
private String pluginKey;
}

View File

@@ -1,179 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.core.constants;
import lombok.Getter;
// TODO @haohao要不放到 enums 包下;
/**
* IoT 设备主题枚举
* <p>
* 用于统一管理 MQTT 协议中的主题常量,基于 Alink 协议规范
*
* @author haohao
*/
@Getter
public enum IotDeviceTopicEnum {
// TODO @haohaoSYS_TOPIC_PREFIX、SERVICE_TOPIC_PREFIX、REPLY_SUFFIX 类似这种,要不搞成这个里面的静态变量?不是枚举值
/**
* 系统主题前缀
*/
SYS_TOPIC_PREFIX("/sys/", "系统主题前缀"),
/**
* 服务调用主题前缀
*/
SERVICE_TOPIC_PREFIX("/thing/service/", "服务调用主题前缀"),
// TODO @haohao注释时中英文之间有个空格
/**
* 设备属性设置主题
* 请求Topic/sys/${productKey}/${deviceName}/thing/service/property/set
* 响应Topic/sys/${productKey}/${deviceName}/thing/service/property/set_reply
*/
PROPERTY_SET_TOPIC("/thing/service/property/set", "设备属性设置主题"),
/**
* 设备属性获取主题
* 请求Topic/sys/${productKey}/${deviceName}/thing/service/property/get
* 响应Topic/sys/${productKey}/${deviceName}/thing/service/property/get_reply
*/
PROPERTY_GET_TOPIC("/thing/service/property/get", "设备属性获取主题"),
/**
* 设备配置设置主题
* 请求Topic/sys/${productKey}/${deviceName}/thing/service/config/set
* 响应Topic/sys/${productKey}/${deviceName}/thing/service/config/set_reply
*/
CONFIG_SET_TOPIC("/thing/service/config/set", "设备配置设置主题"),
/**
* 设备OTA升级主题
* 请求Topic/sys/${productKey}/${deviceName}/thing/service/ota/upgrade
* 响应Topic/sys/${productKey}/${deviceName}/thing/service/ota/upgrade_reply
*/
OTA_UPGRADE_TOPIC("/thing/service/ota/upgrade", "设备OTA升级主题"),
/**
* 设备属性上报主题
* 请求Topic/sys/${productKey}/${deviceName}/thing/event/property/post
* 响应Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
*/
PROPERTY_POST_TOPIC("/thing/event/property/post", "设备属性上报主题"),
/**
* 设备事件上报主题前缀
*/
EVENT_POST_TOPIC_PREFIX("/thing/event/", "设备事件上报主题前缀"),
/**
* 设备事件上报主题后缀
*/
EVENT_POST_TOPIC_SUFFIX("/post", "设备事件上报主题后缀"),
/**
* 响应主题后缀
*/
REPLY_SUFFIX("_reply", "响应主题后缀");
private final String topic;
private final String description;
// TODO @haohao使用 lombok 去除
IotDeviceTopicEnum(String topic, String description) {
this.topic = topic;
this.description = description;
}
/**
* 构建设备服务调用主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
// TODO @haohao貌似 SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName 是统一的;
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName +
SERVICE_TOPIC_PREFIX.getTopic() + serviceIdentifier;
}
/**
* 构建设备属性设置主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertySetTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_SET_TOPIC.getTopic();
}
/**
* 构建设备属性获取主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyGetTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_GET_TOPIC.getTopic();
}
/**
* 构建设备配置设置主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildConfigSetTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + CONFIG_SET_TOPIC.getTopic();
}
/**
* 构建设备 OTA 升级主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildOtaUpgradeTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + OTA_UPGRADE_TOPIC.getTopic();
}
/**
* 构建设备属性上报主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildPropertyPostTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName + PROPERTY_POST_TOPIC.getTopic();
}
/**
* 构建设备事件上报主题
*
* @param productKey 产品Key
* @param deviceName 设备名称
* @param eventIdentifier 事件标识符
* @return 完整的主题路径
*/
public static String buildEventPostTopic(String productKey, String deviceName, String eventIdentifier) {
return SYS_TOPIC_PREFIX.getTopic() + productKey + "/" + deviceName +
EVENT_POST_TOPIC_PREFIX.getTopic() + eventIdentifier + EVENT_POST_TOPIC_SUFFIX.getTopic();
}
/**
* 获取响应主题
*
* @param requestTopic 请求主题
* @return 响应主题
*/
public static String getReplyTopic(String requestTopic) {
return requestTopic + REPLY_SUFFIX.getTopic();
}
}

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.net.component.core.pojo;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* IoT 标准协议响应实体类
@@ -12,7 +11,6 @@ import lombok.experimental.Accessors;
* @author haohao
*/
@Data
@Accessors(chain = true) // TODO @haohao貌似不用写 @Accessors(chain = true),我全局加啦,可见 lombok.config
public class IotStandardResponse {
/**

View File

@@ -1,8 +1,5 @@
package cn.iocoder.yudao.module.iot.net.component.core.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.net.component.core.pojo.IotStandardResponse;
import io.vertx.core.http.HttpHeaders;
@@ -16,40 +13,6 @@ import org.springframework.http.MediaType;
*/
public class IotNetComponentCommonUtils {
/**
* 流程实例的进程编号
*/
private static String processId;
/**
* 获取进程ID
*
* @return 进程ID
*/
public static String getProcessId() {
if (StrUtil.isEmpty(processId)) {
initProcessId();
}
return processId;
}
/**
* 初始化进程ID
*/
private synchronized static void initProcessId() {
processId = String.format("%s@%d@%s", // IP@PID@${uuid}
SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID(), IdUtil.fastSimpleUUID());
}
/**
* 生成请求ID
*
* @return 生成的唯一请求ID
*/
public static String generateRequestId() {
return IdUtil.fastSimpleUUID();
}
/**
* 将对象转换为JSON字符串后写入HTTP响应
*
@@ -89,4 +52,5 @@ public class IotNetComponentCommonUtils {
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(response));
}
}

View File

@@ -1,47 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-net-components</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-net-component-http</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
物联网网络组件 HTTP 模块
</description>
<dependencies>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- 脚本解析相关 -->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-plugin-script</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -1,90 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
/**
* IoT 网络组件 HTTP 的自动配置类
*
* @author haohao
*/
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties(IotNetComponentHttpProperties.class)
@ConditionalOnProperty(prefix = "yudao.iot.component.http", name = "enabled", havingValue = "true", matchIfMissing = false)
@ComponentScan(basePackages = {
"cn.iocoder.yudao.module.iot.net.component.http" // 只扫描 HTTP 组件包
})
public class IotNetComponentHttpAutoConfiguration {
/**
* 初始化 HTTP 组件
*
* @param event 应用启动事件
*/
@EventListener(ApplicationStartedEvent.class)
public void initialize(ApplicationStartedEvent event) {
log.info("[IotNetComponentHttpAutoConfiguration][开始初始化]");
// TODO @芋艿:临时处理
IotMessageBus messageBus = event.getApplicationContext()
.getBean(IotMessageBus.class);
messageBus.register(new IotMessageBusSubscriber<IotDeviceMessage>() {
@Override
public String getTopic() {
return IotDeviceMessage.buildMessageBusGatewayDeviceMessageTopic("yy");
}
@Override
public String getGroup() {
return "test";
}
@Override
public void onMessage(IotDeviceMessage message) {
System.out.println(message);
}
});
}
// TODO @芋艿:貌似这里不用注册 bean
/**
* 创建 Vert.x 实例
*
* @return Vert.x 实例
*/
@Bean(name = "httpVertx")
public Vertx vertx() {
return Vertx.vertx();
}
/**
* 创建设备上行服务器
*/
@Bean(name = "httpDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(
@Lazy @Qualifier("httpVertx") Vertx vertx,
IotDeviceUpstreamApi deviceUpstreamApi,
IotNetComponentHttpProperties properties,
IotDeviceMessageProducer deviceMessageProducer) {
return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, deviceMessageProducer);
}
}

View File

@@ -1,33 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* IoT HTTP 网络组件配置属性
*
* @author haohao
*/
@ConfigurationProperties(prefix = "yudao.iot.component.http")
@Validated
@Data
public class IotNetComponentHttpProperties {
/**
* 是否启用 HTTP 组件
*/
private Boolean enabled;
/**
* HTTP 服务端口
*/
private Integer serverPort;
/**
* 连接超时时间(毫秒)
* <p>
* 默认值10000 毫秒
*/
private Integer connectionTimeoutMs = 10000;
}

View File

@@ -1,44 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.downstream;
// TODO @芋艿:实现下;
///**
// * HTTP 网络组件的 {@link IotDeviceDownstreamHandler} 实现类
// * <p>
// * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!!
// * 类似 MQTT、WebSocket、TCP 网络组件,是可以实现下行指令的。
// *
// * @author 芋道源码
// */
//@Slf4j
//public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
//
// /**
// * 不支持的错误消息
// */
// private static final String NOT_SUPPORTED_MSG = "HTTP 不支持设备下行通信";
//
// @Override
// public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//
// @Override
// public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
// return CommonResult.error(NOT_IMPLEMENTED.getCode(), NOT_SUPPORTED_MSG);
// }
//}

View File

@@ -1,73 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.http.config.IotNetComponentHttpProperties;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.router.IotDeviceUpstreamVertxHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 设备上行服务器
* <p>
* 处理设备通过 HTTP 方式接入的上行消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamServer extends AbstractVerticle {
private final Vertx vertx;
private final IotNetComponentHttpProperties httpProperties;
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void start() {
start(Promise.promise());
}
// TODO @haohao这样貌似初始化不到我临时拷贝上去了
@Override
public void start(Promise<Void> startPromise) {
// 创建路由
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 创建处理器
IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(
deviceUpstreamApi, deviceMessageProducer);
// 添加路由处理器
router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler::handle);
router.post(IotDeviceUpstreamVertxHandler.EVENT_PATH).handler(upstreamHandler::handle);
// 启动 HTTP 服务器
vertx.createHttpServer()
.requestHandler(router)
.listen(httpProperties.getServerPort(), result -> {
if (result.succeeded()) {
log.info("[start][IoT 设备上行服务器启动成功,端口:{}]", httpProperties.getServerPort());
startPromise.complete();
} else {
log.error("[start][IoT 设备上行服务器启动失败]", result.cause());
startPromise.fail(result.cause());
}
});
}
@Override
public void stop(Promise<Void> stopPromise) {
log.info("[stop][IoT 设备上行服务器已停止]");
stopPromise.complete();
}
}

View File

@@ -1,50 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.upstream.auth;
import io.vertx.core.Future;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
// TODO @haohao待实现或者不需要
/**
* IoT 设备认证提供者
* <p>
* 用于 HTTP 设备接入时的身份认证
*
* @author haohao
*/
@Slf4j
public class IotDeviceAuthProvider {
private final ApplicationContext applicationContext;
/**
* 构造函数
*
* @param applicationContext Spring 应用上下文
*/
public IotDeviceAuthProvider(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
/**
* 认证设备
*
* @param context 路由上下文
* @param clientId 设备唯一标识
* @return 认证结果 Future 对象
*/
public Future<Void> authenticate(RoutingContext context, String clientId) {
if (clientId == null || clientId.isEmpty()) {
return Future.failedFuture("clientId 不能为空");
}
try {
log.info("[authenticate][设备认证成功clientId={}]", clientId);
return Future.succeededFuture();
} catch (Exception e) {
log.error("[authenticate][设备认证异常clientId={}]", clientId, e);
return Future.failedFuture(e);
}
}
}

View File

@@ -1,330 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.http.upstream.router;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.constants.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.net.component.core.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备上行统一处理的 Vert.x Handler
* <p>
* 统一处理设备属性上报和事件上报的请求。
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
// TODO @haohao你说咱要不要把 "/sys/:productKey/:deviceName"
// + IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic(),也抽到 IotDeviceTopicEnum 的 build 这种?尽量都收敛掉?
/**
* 属性上报路径
*/
public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName"
+ IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic();
/**
* 事件上报路径
*/
public static final String EVENT_PATH = "/sys/:productKey/:deviceName"
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic() + ":identifier"
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic();
/**
* 属性上报方法标识
*/
private static final String PROPERTY_METHOD = "thing.event.property.post";
/**
* 事件上报方法前缀
*/
private static final String EVENT_METHOD_PREFIX = "thing.event.";
/**
* 事件上报方法后缀
*/
private static final String EVENT_METHOD_SUFFIX = ".post";
/**
* 设备上行 API
*/
private final IotDeviceUpstreamApi deviceUpstreamApi;
/**
* 设备消息生产者
*/
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(RoutingContext routingContext) {
String path = routingContext.request().path();
String requestId = IdUtil.fastSimpleUUID();
try {
// 1. 解析通用参数
Map<String, String> params = parseCommonParams(routingContext, requestId);
String productKey = params.get("productKey");
String deviceName = params.get("deviceName");
JsonObject body = routingContext.body().asJsonObject();
requestId = params.get("requestId");
// 2. 根据路径模式处理不同类型的请求
if (isPropertyPostPath(path)) {
// 处理属性上报
handlePropertyPost(routingContext, productKey, deviceName, requestId, body);
return;
}
if (isEventPostPath(path)) {
// 处理事件上报
String identifier = routingContext.pathParam("identifier");
handleEventPost(routingContext, productKey, deviceName, identifier, requestId, body);
return;
}
// 不支持的请求路径
sendErrorResponse(routingContext, requestId, "unknown", BAD_REQUEST.getCode(), "不支持的请求路径");
} catch (Exception e) {
log.error("[handle][处理上行请求异常] path={}", path, e);
String method = determineMethodFromPath(path, routingContext);
sendErrorResponse(routingContext, requestId, method, INTERNAL_SERVER_ERROR.getCode(),
INTERNAL_SERVER_ERROR.getMsg());
}
}
/**
* 解析通用参数
*
* @param routingContext 路由上下文
* @param defaultRequestId 默认请求 ID
* @return 参数映射
*/
private Map<String, String> parseCommonParams(RoutingContext routingContext, String defaultRequestId) {
Map<String, String> params = MapUtil.newHashMap();
params.put("productKey", routingContext.pathParam("productKey"));
params.put("deviceName", routingContext.pathParam("deviceName"));
JsonObject body = routingContext.body().asJsonObject();
String requestId = ObjUtil.defaultIfNull(body.getString("id"), defaultRequestId);
params.put("requestId", requestId);
return params;
}
/**
* 判断是否是属性上报路径
*
* @param path 路径
* @return 是否是属性上报路径
*/
private boolean isPropertyPostPath(String path) {
return StrUtil.endWith(path, IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic());
}
/**
* 判断是否是事件上报路径
*
* @param path 路径
* @return 是否是事件上报路径
*/
private boolean isEventPostPath(String path) {
return StrUtil.contains(path, IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic())
&& StrUtil.endWith(path, IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic());
}
/**
* 处理属性上报请求
*
* @param routingContext 路由上下文
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param body 请求体
*/
private void handlePropertyPost(RoutingContext routingContext, String productKey, String deviceName,
String requestId, JsonObject body) {
// 1.1 构建设备消息
String deviceKey = "xxx"; // TODO @芋艿:待支持
Long tenantId = 1L; // TODO @芋艿:待支持
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, deviceKey,
requestId, LocalDateTime.now(), IotNetComponentCommonUtils.getProcessId(), tenantId)
.ofPropertyReport(parsePropertiesFromBody(body));
// 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);
// 2. 返回响应
sendResponse(routingContext, requestId, PROPERTY_METHOD, null);
}
/**
* 处理事件上报请求
*
* @param routingContext 路由上下文
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param requestId 请求 ID
* @param body 请求体
*/
private void handleEventPost(RoutingContext routingContext, String productKey, String deviceName,
String identifier, String requestId, JsonObject body) {
// 处理事件上报
IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
requestId, body);
// 事件上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
String method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
// 返回响应
sendResponse(routingContext, requestId, method, result);
}
/**
* 发送响应
*
* @param routingContext 路由上下文
* @param requestId 请求 ID
* @param method 方法名
* @param result 结果
*/
private void sendResponse(RoutingContext routingContext, String requestId, String method,
CommonResult<Boolean> result) {
// TODO @芋艿:后续再优化
IotStandardResponse response;
if (result == null ) {
response = IotStandardResponse.success(requestId, method, null);
} else if (result.isSuccess()) {
response = IotStandardResponse.success(requestId, method, result.getData());
} else {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
}
IotNetComponentCommonUtils.writeJsonResponse(routingContext, response);
}
/**
* 发送错误响应
*
* @param routingContext 路由上下文
* @param requestId 请求 ID
* @param method 方法名
* @param code 错误代码
* @param message 错误消息
*/
private void sendErrorResponse(RoutingContext routingContext, String requestId, String method, Integer code,
String message) {
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, code, message);
IotNetComponentCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
/**
* 从路径确定方法名
*
* @param path 路径
* @param routingContext 路由上下文
* @return 方法名
*/
private String determineMethodFromPath(String path, RoutingContext routingContext) {
if (StrUtil.contains(path, "/property/")) {
return PROPERTY_METHOD;
}
return EVENT_METHOD_PREFIX
+ (routingContext.pathParams().containsKey("identifier")
? routingContext.pathParam("identifier")
: "unknown")
+
EVENT_METHOD_SUFFIX;
}
// TODO @芋艿:这块在看看
/**
* 从请求体解析属性
*
* @param body 请求体
* @return 属性映射
*/
private Map<String, Object> parsePropertiesFromBody(JsonObject body) {
Map<String, Object> properties = MapUtil.newHashMap();
JsonObject params = body.getJsonObject("params");
if (CollUtil.isEmpty(params)) {
return properties;
}
// 将标准格式的 params 转换为平台需要的 properties 格式
for (String key : params.fieldNames()) {
Object valueObj = params.getValue(key);
// 如果是复杂结构(包含 value 和 time
if (valueObj instanceof JsonObject) {
JsonObject valueJson = (JsonObject) valueObj;
properties.put(key, valueJson.containsKey("value") ? valueJson.getValue("value") : valueObj);
} else {
properties.put(key, valueObj);
}
}
return properties;
}
/**
* 解析事件上报请求
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param requestId 请求 ID
* @param body 请求体
* @return 事件上报请求 DTO
*/
private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier,
String requestId, JsonObject body) {
// 解析参数
Map<String, Object> params = parseParamsFromBody(body);
// 构建事件上报请求 DTO
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
.setRequestId(requestId)
.setProcessId(IotNetComponentCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(productKey)
.setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
}
/**
* 从请求体解析参数
*
* @param body 请求体
* @return 参数映射
*/
private Map<String, Object> parseParamsFromBody(JsonObject body) {
Map<String, Object> params = MapUtil.newHashMap();
JsonObject paramsJson = body.getJsonObject("params");
if (CollUtil.isEmpty(paramsJson)) {
return params;
}
for (String key : paramsJson.fieldNames()) {
params.put(key, paramsJson.getValue(key));
}
return params;
}
}

View File

@@ -1,10 +0,0 @@
# HTTP组件默认配置
yudao:
iot:
component:
core:
plugin-key: http # 插件的唯一标识
# http:
# enabled: true # 是否启用HTTP组件默认启用
# server-port: 8092

View File

@@ -17,32 +17,12 @@
</description>
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Net Component 核心 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- Net Component HTTP -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-net-component-http</artifactId>
<version>${revision}</version>
</dependency>
<!-- Net Component EMQX -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
@@ -50,32 +30,8 @@
<version>${revision}</version>
</dependency>
<!-- TODO @芋艿:消息队列,后续可能去掉,默认不使用 rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
<!-- 设置构建的 jar 包名 -->
<finalName>${project.artifactId}</finalName>
<plugins>
<!-- 打包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version> <!-- 需要确认父 POM 中有定义 -->
<executions>
<execution>
<goals>
<goal>repackage</goal> <!-- 将引入的 jar 打入其中 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,18 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* IoT 网络组件聚合启动服务
*
* @author haohao
*/
@SpringBootApplication(scanBasePackages = {"${yudao.info.base-package}.module.iot.net.component"})
public class NetComponentServerApplication {
public static void main(String[] args) {
SpringApplication.run(NetComponentServerApplication.class, args);
}
}

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.net.component.server.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.server.upstream.IotComponentUpstreamClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
@@ -50,28 +49,4 @@ public class IotNetComponentServerConfiguration {
return new IotComponentUpstreamClient(properties, restTemplate);
}
/**
* 配置默认的设备上行客户端,避免在独立运行模式下的循环依赖问题
*
* @return 设备上行客户端
*/
@Bean
@ConditionalOnMissingBean(name = "serverDeviceUpstreamClient")
public Object serverDeviceUpstreamClient() {
// 返回一个空对象,避免找不到类的问题
return new Object();
}
// TODO @haohao这个是不是木有用呀
/**
* 配置默认的组件实例注册客户端
*
* @return 插件实例注册客户端
*/
@Bean
@ConditionalOnMissingBean(name = "serverPluginInstanceRegistryClient")
public Object serverPluginInstanceRegistryClient() {
// 返回一个空对象,避免找不到类的问题
return new Object();
}
}

View File

@@ -18,8 +18,6 @@ public class IotNetComponentServerProperties {
/**
* 上行 URL用于向主应用程序上报数据
* <p>
* 默认http://127.0.0.1:48080
*/
private String upstreamUrl = "http://127.0.0.1:48080";
@@ -33,18 +31,4 @@ public class IotNetComponentServerProperties {
*/
private Duration upstreamReadTimeout = Duration.ofSeconds(30);
/**
* 下行服务端口,用于接收主应用程序的请求
* <p>
* 默认18888
*/
private Integer downstreamPort = 18888;
/**
* 组件服务器唯一标识
* <p>
* 默认yudao-module-iot-net-component-server
*/
private String serverKey = "yudao-module-iot-net-component-server";
}

View File

@@ -1,33 +0,0 @@
package cn.iocoder.yudao.module.iot.net.component.server.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
// TODO @haohao这个是必须的哇可以考虑基于 spring boot actuator
/**
* 健康检查接口
*
* @author haohao
*/
@RestController
@RequestMapping("/health")
public class HealthController {
/**
* 健康检查接口
*
* @return 返回服务状态信息
*/
@GetMapping("/status")
public Map<String, Object> status() {
Map<String, Object> result = new HashMap<>();
result.put("status", "UP");
result.put("message", "IoT 网络组件服务运行正常");
result.put("timestamp", System.currentTimeMillis());
return result;
}
}

View File

@@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.iot.net.component.server.upstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import cn.iocoder.yudao.module.iot.net.component.server.config.IotNetComponentServerProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -19,7 +17,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
*/
@RequiredArgsConstructor
@Slf4j
public class IotComponentUpstreamClient implements IotDeviceUpstreamApi {
public class IotComponentUpstreamClient {
public static final String URL_PREFIX = "/rpc-api/iot/device/upstream";
@@ -27,47 +25,11 @@ public class IotComponentUpstreamClient implements IotDeviceUpstreamApi {
private final RestTemplate restTemplate;
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/update-state";
return doPost(url, updateReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-event";
return doPost(url, reportReqDTO);
}
@Override
public CommonResult<Boolean> registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-device";
return doPost(url, registerReqDTO);
}
@Override
public CommonResult<Boolean> registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/register-sub-device";
return doPost(url, registerReqDTO);
}
@Override
public CommonResult<Boolean> addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/add-device-topology";
return doPost(url, addReqDTO);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/authenticate-emqx-connection";
return doPost(url, authReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-property";
return doPost(url, reportReqDTO);
}
// @Override
// public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
// String url = properties.getUpstreamUrl() + URL_PREFIX + "/update-state";
// return doPost(url, updateReqDTO);
// }
@SuppressWarnings("unchecked")
private <T> CommonResult<Boolean> doPost(String url, T requestBody) {

View File

@@ -1,70 +0,0 @@
# 服务器配置
server:
port: 18080 # 修改端口避免与主应用的8080端口冲突
# Spring 配置
spring:
application:
name: iot-component-server
# 允许循环引用
main:
allow-circular-references: true
allow-bean-definition-overriding: true
# Yudao 配置
yudao:
info:
base-package: cn.iocoder.yudao # 主项目包路径,确保正确
iot:
component:
# 网络组件服务器专用配置
server:
# 上行通信配置,用于向主程序上报数据
upstream-url: http://127.0.0.1:48080 # 主程序 API 地址
upstream-connect-timeout: 30s # 连接超时
upstream-read-timeout: 30s # 读取超时
# 下行通信配置,用于接收主程序的控制指令
downstream-port: 18888 # 下行服务器端口
# 组件服务唯一标识
server-key: yudao-module-iot-net-component-server
# ====================================
# 针对引入的 HTTP 组件的配置
# ====================================
http:
enabled: true # 启用HTTP组件
server-port: 8092 # HTTP组件服务端口
# ====================================
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: true # 启用EMQX组件
mqtt-host: 127.0.0.1 # MQTT服务器主机地址
mqtt-port: 1883 # MQTT服务器端口
mqtt-username: admin # MQTT服务器用户名
mqtt-password: admin123 # MQTT服务器密码
mqtt-ssl: false # 是否启用SSL
mqtt-topics: # 订阅的主题列表
- "/sys/#"
auth-port: 8101 # 认证端口
message-bus:
type: rocketmq # 消息总线的类型
# 日志配置
logging:
level:
cn.iocoder.yudao: INFO
root: INFO
--- #################### 消息队列相关 ####################
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: ${spring.application.name}_PRODUCER # 生产者分组