feat:【IoT 物联网】为 EMQX 协议添加 共享 的 Vertx

This commit is contained in:
haohao
2025-06-29 19:45:47 +08:00
parent 801a6b970e
commit 18c27196f1
5 changed files with 107 additions and 45 deletions

View File

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -45,14 +46,21 @@ public class IotGatewayConfiguration {
@Slf4j
public static class MqttProtocolConfiguration {
@Bean
public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties) {
return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx());
@Bean(destroyMethod = "close")
public Vertx emqxVertx() {
return Vertx.vertx();
}
@Bean
public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties) {
return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx());
public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@Bean
public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@Bean

View File

@@ -28,20 +28,20 @@ public class IotEmqxAuthEventProtocol {
private final String serverId;
private Vertx vertx;
private final Vertx vertx;
private HttpServer httpServer;
public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties,
Vertx vertx) {
this.emqxProperties = emqxProperties;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
}
@PostConstruct
public void start() {
try {
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
startHttpServer();
log.info("[start][IoT 网关 EMQX 认证事件协议服务启动成功, 端口: {}]", emqxProperties.getHttpPort());
} catch (Exception e) {
@@ -53,17 +53,6 @@ public class IotEmqxAuthEventProtocol {
@PreDestroy
public void stop() {
stopHttpServer();
// 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close();
log.debug("[stop][Vertx 实例已关闭]");
} catch (Exception e) {
log.warn("[stop][关闭 Vertx 实例失败]", e);
}
}
log.info("[stop][IoT 网关 EMQX 认证事件协议服务已停止]");
}

View File

@@ -10,7 +10,6 @@ import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jodd.util.ThreadUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -41,9 +40,11 @@ public class IotEmqxUpstreamProtocol {
private IotEmqxUpstreamHandler upstreamHandler;
public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties) {
public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties,
Vertx vertx) {
this.emqxProperties = emqxProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort());
this.vertx = vertx;
}
@PostConstruct
@@ -53,13 +54,10 @@ public class IotEmqxUpstreamProtocol {
}
try {
// 1. 初始化 Vertx 实例
this.vertx = Vertx.vertx();
// 2. 启动 MQTT 客户端
// 1. 启动 MQTT 客户端
startMqttClient();
// 3. 标记服务为运行状态
// 2. 标记服务为运行状态
isRunning = true;
log.info("[start][IoT 网关 EMQX 协议启动成功]");
} catch (Exception e) {
@@ -67,10 +65,16 @@ public class IotEmqxUpstreamProtocol {
stop();
// 异步关闭应用
// TODO haohao是不是不用 sleep 也行哈?
Thread shutdownThread = new Thread(() -> {
ThreadUtil.sleep(1000);
log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
try {
// 确保日志输出完成,使用更优雅的方式
log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
// 等待日志输出完成
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.warn("[start][应用关闭被中断]");
}
System.exit(1);
});
shutdownThread.setDaemon(true);
@@ -90,16 +94,7 @@ public class IotEmqxUpstreamProtocol {
// 1. 停止 MQTT 客户端
stopMqttClient();
// 2. 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close();
} catch (Exception e) {
log.warn("[stop][关闭 Vertx 实例失败]", e);
}
}
// 3. 标记服务为停止状态
// 2. 标记服务为停止状态
isRunning = false;
log.info("[stop][IoT 网关 MQTT 协议服务已停止]");
}
@@ -147,7 +142,7 @@ public class IotEmqxUpstreamProtocol {
// 2. 等待连接结果
try {
// TODO @haohao想了下timeout 可以不设置,全靠 mqttclient 的超时时间?
// 应用层超时控制防止启动过程无限阻塞与MQTT客户端的网络超时是不同层次的控制
boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
if (!awaitResult) {
log.error("[connectMqttSync][等待连接结果超时]");

View File

@@ -54,7 +54,8 @@ public class IotEmqxDownstreamHandler {
return;
}
// 2.2 构建载荷
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
// 2.3 发布消息
protocol.publishMessage(topic, payload);
}
@@ -78,20 +79,54 @@ public class IotEmqxDownstreamHandler {
// 2. 根据消息方法和回复状态,构建 topic
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @芋艿:需要添加对应的 Topic所以需要先判断消息方法类型
// TODO @haohao基于 method然后逆推对应的 topic可以哇约定好~
// 根据消息方法和回复状态构建对应的主题
// 3. 根据消息方法类型构建对应的主题
switch (methodEnum) {
case PROPERTY_POST:
// 属性上报:只支持回复消息(下行)
if (isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
break;
case PROPERTY_SET:
// 属性设置:只支持非回复消息(下行)
if (!isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
}
break;
case EVENT_POST:
// 事件上报:只支持回复消息(下行)
if (isReply) {
String identifier = IotDeviceMessageUtils.getIdentifier(message);
if (StrUtil.isNotBlank(identifier)) {
return IotMqttTopicUtils.buildEventPostReplyTopic(productKey, deviceName, identifier);
}
}
break;
case SERVICE_INVOKE:
// 服务调用:支持请求和回复
String serviceIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (StrUtil.isNotBlank(serviceIdentifier)) {
if (isReply) {
return IotMqttTopicUtils.buildServiceReplyTopic(productKey, deviceName, serviceIdentifier);
} else {
return IotMqttTopicUtils.buildServiceTopic(productKey, deviceName, serviceIdentifier);
}
}
break;
case CONFIG_PUSH:
// 配置推送:平台向设备推送配置(下行请求),设备回复确认(上行回复)
if (!isReply) {
return IotMqttTopicUtils.buildConfigPushTopic(productKey, deviceName);
}
break;
default:
log.warn("[buildTopicByMethod][未处理的消息方法: {}]", methodEnum);
break;
}
log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]",

View File

@@ -91,4 +91,39 @@ public final class IotMqttTopicUtils {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier;
}
/**
* 构建设备服务调用回复主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serviceIdentifier 服务标识符
* @return 完整的主题路径
*/
public static String buildServiceReplyTopic(String productKey, String deviceName, String serviceIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + SERVICE_TOPIC_PREFIX + serviceIdentifier + "_reply";
}
/**
* 构建设备事件上报回复主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param eventIdentifier 事件标识符
* @return 完整的主题路径
*/
public static String buildEventPostReplyTopic(String productKey, String deviceName, String eventIdentifier) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/event/" + eventIdentifier + "_reply";
}
/**
* 构建设备配置推送主题
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 完整的主题路径
*/
public static String buildConfigPushTopic(String productKey, String deviceName) {
return buildDeviceTopicPrefix(productKey, deviceName) + "/thing/config/push";
}
}