feat:【IoT 物联网】优化 MQTT 连接逻辑,更新 HTTP 认证端口,简化消息处理流程

This commit is contained in:
haohao
2025-06-14 18:32:23 +08:00
parent d70c6986d5
commit 8b4bee69f2
6 changed files with 179 additions and 215 deletions

View File

@@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.hutool.core.util.StrUtil;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@@ -105,46 +104,53 @@ public class IotGatewayProperties {
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
// TODO @haohao是不是改成 httpPort不只认证目前看。
/**
* HTTP 认证端口默认8090
* HTTP 服务端口默认8090
*/
private Integer httpAuthPort = 8090;
private Integer httpPort = 8090;
/**
* MQTT 服务器地址
*/
@NotEmpty(message = "MQTT 服务器地址不能为空")
private String mqttHost;
/**
* MQTT 服务器端口默认1883
*/
@NotNull(message = "MQTT 服务器端口不能为空")
private Integer mqttPort = 1883;
/**
* MQTT 用户名
*/
@NotEmpty(message = "MQTT 用户名不能为空")
private String mqttUsername;
/**
* MQTT 密码
*/
@NotEmpty(message = "MQTT 密码不能为空")
private String mqttPassword;
/**
* MQTT 客户端的 SSL 开关
*/
@NotNull(message = "MQTT 是否开启 SSL 不能为空")
private Boolean mqttSsl = false;
/**
* MQTT 客户端 ID如果为空系统将自动生成
*/
@NotEmpty(message = "MQTT 客户端 ID 不能为空")
private String mqttClientId;
/**
* MQTT 订阅的主题
*/
@NotEmpty(message = "MQTT 主题不能为空")
private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
/**
* 默认 QoS 级别
* <p>
@@ -158,24 +164,12 @@ public class IotGatewayProperties {
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 10;
/**
* 重连延迟时间(毫秒)
*/
private Long reconnectDelayMs = 5000L;
// TODO @haohao貌似可以通过配置文件 + el 表达式;尽量还是配置文件;
/**
* 获取 MQTT 客户端 ID如果未配置则自动生成
*
* @return MQTT 客户端 ID
*/
public String getMqttClientId() {
if (StrUtil.isBlank(mqttClientId)) {
mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis();
}
return mqttClientId;
}
}
}

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttHttpAuthHandler;
@@ -20,7 +19,10 @@ import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT 网关 MQTT 协议:接收设备上行消息
@@ -127,7 +129,7 @@ public class IotMqttUpstreamProtocol {
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(authHandler::handleEvent);
// 2. 启动 HTTP 服务器
int authPort = emqxProperties.getHttpAuthPort();
int authPort = emqxProperties.getHttpPort();
try {
httpAuthServer = vertx.createHttpServer()
.requestHandler(router)
@@ -169,16 +171,61 @@ public class IotMqttUpstreamProtocol {
log.info("[startMqttClient][使用 MQTT 客户端 ID: {}]", emqxProperties.getMqttClientId());
createMqttClient();
// 3. 连接 MQTT Broker异步连接,不会抛出异常
connectMqtt(false);
// 3. 连接 MQTT Broker同步等待首次连接结果
boolean connected = connectMqttSync();
if (!connected) {
throw new RuntimeException("首次连接 MQTT Broker 失败");
}
log.info("[startMqttClient][MQTT 客户端启动完成,正在异步连接中...]");
log.info("[startMqttClient][MQTT 客户端启动完成]");
} catch (Exception e) {
log.error("[startMqttClient][MQTT 客户端启动失败]", e);
throw new RuntimeException("MQTT 客户端启动失败", e);
}
}
/**
* 同步连接 MQTT Broker
*
* @return 是否连接成功
*/
private boolean connectMqttSync() {
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
log.info("[connectMqttSync][开始连接 MQTT Broker, host: {}, port: {}]", host, port);
// 使用计数器实现同步等待
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
java.util.concurrent.atomic.AtomicBoolean success = new java.util.concurrent.atomic.AtomicBoolean(false);
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
// 设置处理器
setupMqttHandlers();
// 订阅主题
subscribeToTopics();
success.set(true);
} else {
log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]",
host, port, connectResult.cause());
// 首次连接失败,启动重连机制
reconnectWithDelay();
}
latch.countDown();
});
try {
// 等待连接结果最多等待10秒
latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[connectMqttSync][等待连接结果被中断]", e);
}
return success.get();
}
/**
* 停止 MQTT 客户端
*/
@@ -218,15 +265,6 @@ public class IotMqttUpstreamProtocol {
// 1. 参数校验
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
// TODO @haohao这些参数校验交给 validator
if (StrUtil.isBlank(host)) {
log.error("[connectMqtt][MQTT Host 为空, 无法连接]");
throw new IllegalArgumentException("MQTT Host 不能为空");
}
if (port == null || port <= 0) {
log.error("[connectMqtt][MQTT Port({}) 无效]", port);
throw new IllegalArgumentException("MQTT Port 必须为正整数");
}
if (isReconnect) {
log.info("[connectMqtt][开始重连 MQTT Broker, host: {}, port: {}]", host, port);
@@ -238,32 +276,28 @@ public class IotMqttUpstreamProtocol {
// 2. 异步连接
mqttClient.connect(port, host, connectResult -> {
// TODO @haohaoif return减少括号哈
if (connectResult.succeeded()) {
if (isReconnect) {
log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port);
} else {
log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
}
// 设置处理器
setupMqttHandlers();
// 订阅主题
subscribeToTopics();
} else {
if (!connectResult.succeeded()) {
log.error("[connectMqtt][连接 MQTT Broker 失败, host: {}, port: {}, isReconnect: {}]",
host, port, isReconnect, connectResult.cause());
// TODO @haohao体感上是不是首次必须连接成功类似 mysql首次要连接上然后后续可以重连
// 首次连接失败或重连失败时,尝试重连
if (!isReconnect) {
// 首次连接失败时,也要尝试重连
log.warn("[connectMqtt][首次连接失败,将开始重连机制]");
reconnectWithDelay();
} else {
// 重连失败时,继续尝试重连
reconnectWithDelay();
}
reconnectWithDelay();
return;
}
if (isReconnect) {
log.info("[connectMqtt][MQTT 客户端重连成功, host: {}, port: {}]", host, port);
} else {
log.info("[connectMqtt][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
}
// 设置处理器
setupMqttHandlers();
// 订阅主题
subscribeToTopics();
});
}
@@ -283,12 +317,7 @@ public class IotMqttUpstreamProtocol {
* 设置 MQTT 处理器
*/
private void setupMqttHandlers() {
// TODO @haohaomqttClient 一定非空;
if (mqttClient == null) {
log.warn("[setupMqttHandlers][MQTT 客户端为空,跳过处理器设置]");
return;
}
// 由于 mqttClient 在 createMqttClient() 方法中已初始化,此处无需检查
// 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]");
@@ -301,13 +330,9 @@ public class IotMqttUpstreamProtocol {
});
// 设置消息处理器
// TODO @haohaoupstreamHandler 一定非空;
if (upstreamHandler != null) {
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]");
} else {
log.warn("[setupMqttHandlers][上行消息处理器为空,跳过设置]");
}
// upstreamHandler 在 startMqttClient() 方法中已初始化,此处无需检查
mqttClient.publishHandler(upstreamHandler::handle);
log.debug("[setupMqttHandlers][MQTT 消息处理器设置完成]");
}
/**
@@ -327,35 +352,39 @@ public class IotMqttUpstreamProtocol {
int qos = emqxProperties.getMqttQos();
log.info("[subscribeToTopics][开始订阅主题, 共 {} 个, QoS: {}]", topicList.size(), qos);
// TODO @haohao使用 atomicinteger 会更合适;
int[] successCount = { 0 }; // 使用数组以便在 lambda 中修改
int[] failCount = { 0 };
// 使用 AtomicInteger 替代数组,线程安全且更简洁
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
// 构建主题-QoS 映射,批量订阅
Map<String, Integer> topicQosMap = new HashMap<>();
for (String topic : topicList) {
// TODO @haohaoMqttClient subscribe(Map<String, Integer> topics, 是不是更简洁哈;
mqttClient.subscribe(topic, qos, subscribeResult -> {
if (subscribeResult.succeeded()) {
successCount[0]++;
log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos);
// 当所有主题都处理完成时,记录汇总日志
if (successCount[0] + failCount[0] == topicList.size()) {
log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]",
successCount[0], failCount[0], topicList.size());
}
} else {
failCount[0]++;
log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}, 原因: {}]",
topic, qos, subscribeResult.cause().getMessage(), subscribeResult.cause());
// 当所有主题都处理完成时,记录汇总日志
if (successCount[0] + failCount[0] == topicList.size()) {
log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]",
successCount[0], failCount[0], topicList.size());
}
}
});
topicQosMap.put(topic, qos);
}
// 批量订阅所有主题
mqttClient.subscribe(topicQosMap, subscribeResult -> {
if (subscribeResult.succeeded()) {
// 批量订阅成功,记录所有主题为成功
int successful = successCount.addAndGet(topicList.size());
log.info("[subscribeToTopics][批量订阅主题成功, 共 {} 个主题, QoS: {}]", successful, qos);
for (String topic : topicList) {
log.debug("[subscribeToTopics][订阅主题成功, topic: {}, qos: {}]", topic, qos);
}
} else {
// 批量订阅失败,记录所有主题为失败
int failed = failCount.addAndGet(topicList.size());
log.error("[subscribeToTopics][批量订阅主题失败, 共 {} 个主题, 原因: {}]",
failed, subscribeResult.cause().getMessage(), subscribeResult.cause());
for (String topic : topicList) {
log.error("[subscribeToTopics][订阅主题失败, topic: {}, qos: {}]", topic, qos);
}
}
// 记录汇总日志
log.info("[subscribeToTopics][主题订阅完成, 成功: {}, 失败: {}, 总计: {}]",
successCount.get(), failCount.get(), topicList.size());
});
}
/**

View File

@@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
@@ -48,52 +48,49 @@ public class IotMqttDownstreamHandler {
}
// 2.1 根据方法构建主题
String topic = buildTopicByMethod(message.getMethod(), deviceInfo.getProductKey(), deviceInfo.getDeviceName());
String topic = buildTopicByMethod(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (StrUtil.isBlank(topic)) {
log.warn("[handle][未知的消息方法: {}]", message.getMethod());
return;
}
// 2.2 构建载荷
// TODO @haohao这里是不是 encode 就可以发拉?因为本身就 json 化了。
JSONObject payload = buildDownstreamPayload(message);
byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName());
// 2.3 发布消息
protocol.publishMessage(topic, payload.toString());
protocol.publishMessage(topic, new String(payload));
}
// TODO @haohao这个是不是也可以计算IotDeviceMessageUtils 的 isReplyMessage这样就直接生成了
/**
* 根据方法构建主题
* 根据消息方法和回复状态构建主题
*
* @param method 消息方法
* @param message 设备消息
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 构建的主题,如果方法不支持返回 null
*/
private String buildTopicByMethod(String method, String productKey, String deviceName) {
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
private String buildTopicByMethod(IotDeviceMessage message, String productKey, String deviceName) {
// 1. 解析消息方法
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod());
if (methodEnum == null) {
log.warn("[buildTopicByMethod][未知的消息方法: {}]", message.getMethod());
return null;
}
return switch (methodEnum) {
case PROPERTY_POST -> IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
case PROPERTY_SET -> IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
default -> null;
};
}
// 2. 判断是否回复消息
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
/**
* 构建下行消息载荷
*
* @param message 设备消息
* @return JSON 载荷
*/
private JSONObject buildDownstreamPayload(IotDeviceMessage message) {
// 使用 IotDeviceMessageService 进行消息编码
IotDeviceRespDTO device = deviceService.getDeviceFromCache(message.getDeviceId());
byte[] encodedBytes = deviceMessageService.encodeDeviceMessage(message, device.getProductKey(),
device.getDeviceName());
return new JSONObject(new String(encodedBytes));
// 3. 根据消息方法和回复状态,构建主题
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_POST && isReply) {
return IotMqttTopicUtils.buildPropertyPostReplyTopic(productKey, deviceName);
}
if (methodEnum == IotDeviceMessageMethodEnum.PROPERTY_SET && !isReply) {
return IotMqttTopicUtils.buildPropertySetTopic(productKey, deviceName);
}
log.warn("[buildTopicByMethod][暂时不支持的下行消息: method={}, isReply={}]",
message.getMethod(), isReply);
return null;
}
}

View File

@@ -14,8 +14,6 @@ import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
/**
* IoT 网关 MQTT HTTP 认证处理器
* <p>
@@ -67,7 +65,7 @@ public class IotMqttHttpAuthHandler {
*/
public void handleAuth(RoutingContext context) {
try {
// 参数校验
// 1. 参数校验
JsonObject body = parseRequestBody(context);
if (body == null) {
return;
@@ -78,23 +76,21 @@ public class IotMqttHttpAuthHandler {
log.debug("[handleAuth][设备认证请求: clientId={}, username={}]", clientId, username);
if (StrUtil.hasEmpty(clientId, username, password)) {
log.info("[handleAuth][认证参数不完整: clientId={}, username={}]", clientId, username);
sendAuthResponse(context, RESULT_DENY, false, "认证参数不完整");
sendAuthResponse(context, RESULT_DENY);
return;
}
// 执行设备认证
// 2. 执行认证
boolean authResult = performDeviceAuth(clientId, username, password);
log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult);
if (authResult) {
// TODO @haohao是不是两条 info直接打认证结果authResult
log.info("[handleAuth][设备认证成功: {}]", username);
sendAuthResponse(context, RESULT_ALLOW, false, null);
sendAuthResponse(context, RESULT_ALLOW);
} else {
log.info("[handleAuth][设备认证失败: {}]", username);
sendAuthResponse(context, RESULT_DENY, false, DEVICE_AUTH_FAIL.getMsg());
sendAuthResponse(context, RESULT_DENY);
}
} catch (Exception e) {
log.error("[handleAuth][设备认证异常]", e);
sendAuthResponse(context, RESULT_IGNORE, false, "认证服务异常");
sendAuthResponse(context, RESULT_IGNORE);
}
}
@@ -104,9 +100,10 @@ public class IotMqttHttpAuthHandler {
* 支持的事件类型client.connected、client.disconnected 等
*/
public void handleEvent(RoutingContext context) {
JsonObject body = null;
try {
// 解析请求体
JsonObject body = parseRequestBody(context);
// 1. 解析请求体
body = parseRequestBody(context);
if (body == null) {
return;
}
@@ -114,7 +111,7 @@ public class IotMqttHttpAuthHandler {
String username = body.getString("username");
log.debug("[handleEvent][收到事件: {} - {}]", event, username);
// 根据事件类型进行分发处理
// 2. 根据事件类型进行分发处理
switch (event) {
case EVENT_CLIENT_CONNECTED:
handleClientConnected(body);
@@ -123,15 +120,13 @@ public class IotMqttHttpAuthHandler {
handleClientDisconnected(body);
break;
default:
log.debug("[handleEvent][忽略事件: {}]", event);
break;
}
// EMQX Webhook 只需要 200 状态码,无需响应体
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
} catch (Exception e) {
// TODO @haohaobody 可以打印出来
log.error("[handleEvent][事件处理失败]", e);
log.error("[handleEvent][事件处理失败][body={}]", body != null ? body.encode() : "null", e);
// 即使处理失败,也返回 200 避免EMQX重试
context.response().setStatusCode(SUCCESS_STATUS_CODE).end();
}
@@ -163,18 +158,19 @@ public class IotMqttHttpAuthHandler {
* @return 请求体JSON对象解析失败时返回null
*/
private JsonObject parseRequestBody(RoutingContext context) {
String rawBody = null;
try {
rawBody = context.body().asString();
JsonObject body = context.body().asJsonObject();
if (body == null) {
log.info("[parseRequestBody][请求体为空]");
sendAuthResponse(context, RESULT_IGNORE, false, "请求体不能为空");
log.info("[parseRequestBody][请求体为空][rawBody={}]", rawBody);
sendAuthResponse(context, RESULT_IGNORE);
return null;
}
return body;
} catch (Exception e) {
// TODO @haohao最好把 body 打印出来;
log.error("[parseRequestBody][解析请求体失败]", e);
sendAuthResponse(context, RESULT_IGNORE, false, "请求体格式错误");
log.error("[parseRequestBody][解析请求体失败][rawBody={}]", rawBody, e);
sendAuthResponse(context, RESULT_IGNORE);
return null;
}
}
@@ -203,13 +199,10 @@ public class IotMqttHttpAuthHandler {
* 处理设备状态变化
*
* @param username 用户名
* @param online 是否在线
* @param online 是否在线 true 在线 false 离线
*/
private void handleDeviceStateChange(String username, boolean online) {
// 解析设备信息
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
return;
}
// 1. 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.debug("[handleDeviceStateChange][跳过非设备连接: {}]", username);
@@ -217,24 +210,13 @@ public class IotMqttHttpAuthHandler {
}
try {
// TODO @haohaoserverId 获取非空,可以忽略掉;
String serverId = protocol.getServerId();
if (StrUtil.isEmpty(serverId)) {
log.error("[handleDeviceStateChange][获取服务器ID失败]");
return;
}
// 构建设备状态消息
// 2. 构建设备状态消息
IotDeviceMessage message = online ? IotDeviceMessage.buildStateOnline()
: IotDeviceMessage.buildStateOffline();
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
// TODO @haohaoonline 不用翻译
log.info("[handleDeviceStateChange][设备状态更新: {}/{} -> {}]",
deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
online ? "在线" : "离线");
// 3. 发送设备状态消息
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
} catch (Exception e) {
log.error("[handleDeviceStateChange][发送设备状态消息失败: {}]", username, e);
}
@@ -244,16 +226,14 @@ public class IotMqttHttpAuthHandler {
* 发送 EMQX 认证响应
* 根据 EMQX 官方文档要求,必须返回 JSON 格式响应
*
* @param context 路由上下文
* @param result 认证结果allow、deny、ignore
* @param isSuperuser 是否超级用户
* @param message 日志消息仅用于日志记录不返回给EMQX
* @param context 路由上下文
* @param result 认证结果allow、deny、ignore
*/
private void sendAuthResponse(RoutingContext context, String result, boolean isSuperuser, String message) {
private void sendAuthResponse(RoutingContext context, String result) {
// 构建符合 EMQX 官方规范的响应
JsonObject response = new JsonObject()
.put("result", result)
.put("is_superuser", isSuperuser);
.put("is_superuser", false);
// 可以根据业务需求添加客户端属性
// response.put("client_attrs", new JsonObject().put("role", "device"));
@@ -261,7 +241,6 @@ public class IotMqttHttpAuthHandler {
// 可以添加认证过期时间(可选)
// response.put("expire_at", System.currentTimeMillis() / 1000 + 3600);
// 记录详细的响应日志message仅用于日志不返回给EMQX
context.response()
.setStatusCode(SUCCESS_STATUS_CODE)
.putHeader("Content-Type", "application/json; charset=utf-8")

View File

@@ -7,9 +7,6 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.Arrays;
/**
* IoT 网关 MQTT 上行消息处理器
@@ -32,38 +29,24 @@ public class IotMqttUpstreamHandler {
* 处理 MQTT 发布消息
*/
public void handle(MqttPublishMessage mqttMessage) {
log.info("[handle][收到 MQTT 消息, topic: {}, payload: {}]", mqttMessage.topicName(), mqttMessage.payload());
String topic = mqttMessage.topicName();
byte[] payload = mqttMessage.payload().getBytes();
try {
// 1. 前置校验
if (StrUtil.isBlank(topic)) {
log.warn("[handle][主题为空, 忽略消息]");
// 1. 解析主题,一次性获取所有信息
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[handle][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
// 2.1 识别并验证消息类型
String messageType = getMessageType(topic);
// TODO @haohao可以使用 hutool 的,它的字符串拼接更简单;
Assert.notNull(messageType, String.format("未知的消息类型, topic(%s)", topic));
// 2.2 解析主题,获取 productKey 和 deviceName
// TODO @haohao体感 getMessageType 和下面,都 split是不是一次就 ok 拉1split 掉22、3 位置是 productKey、deviceName34 开始还是 method
String[] topicParts = topic.split("/");
if (topicParts.length < 4) {
log.warn("[handle][topic({}) 格式不正确,无法解析 productKey 和 deviceName]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// TODO @haohao是不是要判断部分为空就不行呀
if (StrUtil.isAllBlank(productKey, deviceName)) {
log.warn("[handle][topic({}) 格式不正确productKey 和 deviceName 部分为空]", topic);
return;
}
// 3. 解码消息
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handle][topic({}) payload({}) 消息解码失败", topic, new String(payload));
log.warn("[handle][topic({}) payload({}) 消息解码失败]", topic, new String(payload));
return;
}
@@ -74,22 +57,4 @@ public class IotMqttUpstreamHandler {
}
}
// TODO @haohao是不是 getMethodFromTopic
/**
* 从主题中,获得消息类型
*
* @param topic 主题
* @return 消息类型
*/
private String getMessageType(String topic) {
String[] topicParts = topic.split("/");
// 约定topic 第 4 个部分开始为消息类型
// 例如:/sys/{productKey}/{deviceName}/thing/property/post -> thing/property/post
if (topicParts.length > 4) {
// TODO @haohao是不是 subString 前 3 个,性能更好;
return String.join("/", Arrays.copyOfRange(topicParts, 4, topicParts.length));
}
return topicParts[topicParts.length - 1];
}
}

View File

@@ -32,13 +32,13 @@ yudao:
# ====================================
emqx:
enabled: true
http-auth-port: 8090 # MQTT HTTP 认证服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
mqtt-username: admin # MQTT 用户名
mqtt-password: public # MQTT 密码
mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID
mqtt-ssl: false # 是否开启 SSL
http-port: 8090 # MQTT HTTP 服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
mqtt-username: admin # MQTT 用户名
mqtt-password: public # MQTT 密码
mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID
mqtt-ssl: false # 是否开启 SSL
mqtt-topics:
- "/sys/#" # 系统主题
@@ -55,4 +55,4 @@ logging:
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG
# MQTT 客户端日志
io.vertx.mqtt: DEBUG
# io.vertx.mqtt: DEBUG