feat:【IoT 物联网】优化 MQTT 客户端连接逻辑

This commit is contained in:
haohao
2025-06-16 09:40:07 +08:00
parent 2cf5bf5348
commit 9805cf2463
2 changed files with 158 additions and 138 deletions

View File

@@ -10,6 +10,7 @@ import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jodd.util.ThreadUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -65,19 +66,15 @@ public class IotEmqxUpstreamProtocol {
log.error("[start][IoT 网关 EMQX 协议服务启动失败,应用将关闭]", e);
stop();
// 异步关闭应用,避免阻塞当前线程
// TODO @haohao是不是阻塞也没关系哈
new Thread(() -> {
try {
// TODO @haohao可以考虑用 ThreadUtil.sleep 更简洁
Thread.sleep(1000); // 等待 1 秒让日志输出完成
log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
System.exit(1); // 直接关闭 JVM
}
}).start();
// 异步关闭应用
Thread shutdownThread = new Thread(() -> {
ThreadUtil.sleep(1000);
log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
System.exit(1);
});
shutdownThread.setDaemon(true);
shutdownThread.setName("emergency-shutdown");
shutdownThread.start();
throw e;
}
@@ -114,114 +111,149 @@ public class IotEmqxUpstreamProtocol {
// 1. 初始化消息处理器
this.upstreamHandler = new IotEmqxUpstreamHandler(this);
// 2. 创建 MQTT 客户端,连接 MQTT Broker
boolean connected = connectMqttSync();
if (!connected) {
throw new RuntimeException("首次连接 MQTT Broker 失败");
}
// 2. 创建 MQTT 客户端
createMqttClient();
// 3. 同步连接 MQTT Broker
connectMqttSync();
} catch (Exception e) {
log.error("[startMqttClient][MQTT 客户端启动失败]", e);
throw new RuntimeException("MQTT 客户端启动失败", e);
throw new RuntimeException("MQTT 客户端启动失败: " + e.getMessage(), e);
}
}
/**
* 连接 MQTT Broker
*
* @param isReconnect 是否为重连
* @param isSync 是否同步等待连接结果
* @return 当 isSync 为 true 时返回连接是否成功,否则返回 null
*/
// TODO @haohao是不是不用结果结束后直接判断 this.mqttClient.isConnected()
private Boolean connectMqtt(boolean isReconnect, boolean isSync) {
// TODO @haohao这块代码是不是放到
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
// 2.3.1. 如果是重连,则需要重新创建 MQTT 客户端
// TODO @hoahao疑惑为啥这里要重新创建对象呀另外创建是不是拿到 reconnectWithDelay 会更合适?这样和 startMqttClient 一样呢;
if (isReconnect) {
createMqttClient();
}
// 2.3.2. 连接 MQTT Broker
CountDownLatch latch = isSync ? new CountDownLatch(1) : null;
AtomicBoolean success = isSync
? new AtomicBoolean(false)
: null;
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
if (isReconnect) {
log.info("[connectMqtt][MQTT 客户端重连成功]");
} else {
log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
}
// 设置处理器和订阅主题
setupMqttHandlers();
subscribeToTopics();
if (success != null) {
success.set(true);
}
} else {
log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}]", host, port, connectResult.cause());
if (isReconnect) {
reconnectWithDelay();
} else {
log.error("[connectMqtt][首次连接失败,连接终止]");
}
}
if (latch != null) {
latch.countDown();
}
});
// 2.3.3. 如果需要同步等待连接结果,则等待
if (isSync) {
try {
latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[connectMqtt][等待连接结果被中断]", e);
}
return success.get();
}
return null;
}
/**
* 同步连接 MQTT Broker
*
* @return 是否连接成功
*/
private boolean connectMqttSync() {
Boolean result = connectMqtt(false, true);
return result != null ? result : false;
private void connectMqttSync() {
String host = emqxProperties.getMqttHost();
int port = emqxProperties.getMqttPort();
// 1. 创建同步等待对象
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean(false);
// 2. 连接 MQTT Broker
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
setupMqttHandlers();
subscribeToTopics();
success.set(true);
} else {
log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]",
host, port, connectResult.cause());
}
latch.countDown();
});
// 3. 等待连接结果
try {
boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
if (!awaitResult) {
log.error("[connectMqttSync][等待连接结果超时]");
throw new RuntimeException("连接 MQTT Broker 超时");
}
if (!success.get()) {
throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d",
host, port));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[connectMqttSync][等待连接结果被中断]", e);
throw new RuntimeException("连接 MQTT Broker 被中断", e);
}
}
/**
* 异步连接 MQTT Broker
*/
private void connectMqttAsync() {
String host = emqxProperties.getMqttHost();
int port = emqxProperties.getMqttPort();
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
log.info("[connectMqttAsync][MQTT 客户端重连成功]");
setupMqttHandlers();
subscribeToTopics();
} else {
log.error("[connectMqttAsync][连接 MQTT Broker 失败, host: {}, port: {}]",
host, port, connectResult.cause());
log.warn("[connectMqttAsync][重连失败,将再次尝试]");
reconnectWithDelay();
}
});
}
/**
* 延迟重连
*/
private void reconnectWithDelay() {
if (!isRunning) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
return;
}
long delay = emqxProperties.getReconnectDelayMs();
log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay);
vertx.setTimer(delay, timerId -> {
if (!isRunning) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
return;
}
log.info("[reconnectWithDelay][开始重连 MQTT Broker]");
try {
createMqttClient();
connectMqttAsync();
} catch (Exception e) {
log.error("[reconnectWithDelay][重连过程中发生异常]", e);
vertx.setTimer(delay, t -> reconnectWithDelay());
}
});
}
/**
* 停止 MQTT 客户端
*/
private void stopMqttClient() {
// 1. 取消订阅所有主题
if (mqttClient != null && mqttClient.isConnected()) {
List<String> topicList = emqxProperties.getMqttTopics();
for (String topic : topicList) {
if (mqttClient == null) {
return;
}
try {
if (mqttClient.isConnected()) {
// 1. 取消订阅所有主题
List<String> topicList = emqxProperties.getMqttTopics();
for (String topic : topicList) {
try {
mqttClient.unsubscribe(topic);
} catch (Exception e) {
log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e);
}
}
// 2. 断开 MQTT 客户端连接
try {
mqttClient.unsubscribe(topic);
CountDownLatch disconnectLatch = new CountDownLatch(1);
mqttClient.disconnect(ar -> disconnectLatch.countDown());
if (!disconnectLatch.await(5, java.util.concurrent.TimeUnit.SECONDS)) {
log.warn("[stopMqttClient][断开 MQTT 连接超时]");
}
} catch (Exception e) {
log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e);
log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e);
}
}
}
// 2. 断开 MQTT 客户端连接
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
} catch (Exception e) {
log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e);
}
} catch (Exception e) {
log.warn("[stopMqttClient][停止 MQTT 客户端过程中发生异常]", e);
} finally {
mqttClient = null;
}
}
@@ -241,7 +273,7 @@ public class IotEmqxUpstreamProtocol {
* 设置 MQTT 处理器
*/
private void setupMqttHandlers() {
// 1.1 设置断开重连监听器
// 1. 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
if (!isRunning) {
return;
@@ -249,10 +281,12 @@ public class IotEmqxUpstreamProtocol {
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]");
reconnectWithDelay();
});
// 1.2 设置异常处理器
mqttClient.exceptionHandler(exception -> log.error("[exceptionHandler][MQTT 客户端异常]", exception));
// 2. 设置消息处理器
// 2. 设置异常处理器
mqttClient.exceptionHandler(exception ->
log.error("[exceptionHandler][MQTT 客户端异常]", exception));
// 3. 设置消息处理器
mqttClient.publishHandler(upstreamHandler::handle);
}
@@ -283,35 +317,13 @@ public class IotEmqxUpstreamProtocol {
});
}
/**
* 延迟重连
*/
private void reconnectWithDelay() {
long delay = emqxProperties.getReconnectDelayMs();
vertx.setTimer(delay, timerId -> {
if (!isRunning) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
return;
}
log.info("[reconnectWithDelay][开始重连 MQTT Broker]");
try {
connectMqtt(true, false);
} catch (Exception e) {
log.error("[reconnectWithDelay][重连失败, 将继续尝试]", e);
}
// TODO @haohao是不是把如果连接失败放到这里处理继续发起。。。这样connect 逻辑更简单纯粹1首次连接失败就退出2重新连接如果失败继续重试
});
}
/**
* 发布消息到 MQTT Broker
*
* @param topic 主题
* @param payload 消息内容
*/
public void publishMessage(String topic, String payload) {
public void publishMessage(String topic, byte[] payload) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息]");
return;

View File

@@ -56,8 +56,7 @@ public class IotEmqxDownstreamHandler {
// 2.2 构建载荷
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
// 2.3 发布消息
// TODO @haohao可以直接使用 bytes 作为 payload 么?
protocol.publishMessage(topic, new String(payload));
protocol.publishMessage(topic, payload);
}
/**
@@ -78,13 +77,22 @@ public class IotEmqxDownstreamHandler {
// 2. 根据消息方法和回复状态,构建 topic
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @haohao这里判断要不去掉 methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST直接构建如果未来有不回复的在特殊搞开关
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
// TODO @芋艿:需要添加对应的 Topic所以需要先判断消息方法类型
// 根据消息方法和回复状态构建对应的主题
switch (methodEnum) {
case PROPERTY_POST:
if (isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
break;
case PROPERTY_SET:
if (!isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
}
break;
}
log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]",
message.getMethod(), isReply);
return null;