review:【IoT 物联网】mqtt 协议的实现(整体没问题了)

This commit is contained in:
YunaiV
2025-06-16 12:22:58 +08:00
parent 9805cf2463
commit a3fc0730e9
2 changed files with 6 additions and 10 deletions

View File

@@ -67,6 +67,7 @@ public class IotEmqxUpstreamProtocol {
stop(); stop();
// 异步关闭应用 // 异步关闭应用
// TODO haohao是不是不用 sleep 也行哈?
Thread shutdownThread = new Thread(() -> { Thread shutdownThread = new Thread(() -> {
ThreadUtil.sleep(1000); ThreadUtil.sleep(1000);
log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); log.error("[start][由于 MQTT 连接失败,正在关闭应用]");
@@ -128,12 +129,9 @@ public class IotEmqxUpstreamProtocol {
private void connectMqttSync() { private void connectMqttSync() {
String host = emqxProperties.getMqttHost(); String host = emqxProperties.getMqttHost();
int port = emqxProperties.getMqttPort(); int port = emqxProperties.getMqttPort();
// 1. 连接 MQTT Broker
// 1. 创建同步等待对象
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean(false); AtomicBoolean success = new AtomicBoolean(false);
// 2. 连接 MQTT Broker
mqttClient.connect(port, host, connectResult -> { mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) { if (connectResult.succeeded()) {
log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
@@ -147,16 +145,16 @@ public class IotEmqxUpstreamProtocol {
latch.countDown(); latch.countDown();
}); });
// 3. 等待连接结果 // 2. 等待连接结果
try { try {
// TODO @haohao想了下timeout 可以不设置,全靠 mqttclient 的超时时间?
boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
if (!awaitResult) { if (!awaitResult) {
log.error("[connectMqttSync][等待连接结果超时]"); log.error("[connectMqttSync][等待连接结果超时]");
throw new RuntimeException("连接 MQTT Broker 超时"); throw new RuntimeException("连接 MQTT Broker 超时");
} }
if (!success.get()) { if (!success.get()) {
throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", host, port));
host, port));
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -171,7 +169,6 @@ public class IotEmqxUpstreamProtocol {
private void connectMqttAsync() { private void connectMqttAsync() {
String host = emqxProperties.getMqttHost(); String host = emqxProperties.getMqttHost();
int port = emqxProperties.getMqttPort(); int port = emqxProperties.getMqttPort();
mqttClient.connect(port, host, connectResult -> { mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) { if (connectResult.succeeded()) {
log.info("[connectMqttAsync][MQTT 客户端重连成功]"); log.info("[connectMqttAsync][MQTT 客户端重连成功]");
@@ -199,7 +196,6 @@ public class IotEmqxUpstreamProtocol {
long delay = emqxProperties.getReconnectDelayMs(); long delay = emqxProperties.getReconnectDelayMs();
log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay); log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay);
vertx.setTimer(delay, timerId -> { vertx.setTimer(delay, timerId -> {
if (!isRunning) { if (!isRunning) {
return; return;
@@ -226,7 +222,6 @@ public class IotEmqxUpstreamProtocol {
if (mqttClient == null) { if (mqttClient == null) {
return; return;
} }
try { try {
if (mqttClient.isConnected()) { if (mqttClient.isConnected()) {
// 1. 取消订阅所有主题 // 1. 取消订阅所有主题

View File

@@ -79,6 +79,7 @@ public class IotEmqxDownstreamHandler {
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
// TODO @芋艿:需要添加对应的 Topic所以需要先判断消息方法类型 // TODO @芋艿:需要添加对应的 Topic所以需要先判断消息方法类型
// TODO @haohao基于 method然后逆推对应的 topic可以哇约定好~
// 根据消息方法和回复状态构建对应的主题 // 根据消息方法和回复状态构建对应的主题
switch (methodEnum) { switch (methodEnum) {
case PROPERTY_POST: case PROPERTY_POST: