feat:【IoT 物联网】优化 MQTT 协议实现

This commit is contained in:
haohao
2025-08-14 19:40:20 +08:00
parent 14336002f3
commit 50ac2ca5f6
4 changed files with 104 additions and 82 deletions

View File

@@ -129,12 +129,11 @@ public class IotGatewayConfiguration {
@Bean
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx mqttVertx) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(),
deviceService, messageService, connectionManager, mqttVertx);
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService,
connectionManager, mqttVertx);
}
@Bean

View File

@@ -4,7 +4,6 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttServer;
@@ -24,8 +23,6 @@ public class IotMqttUpstreamProtocol {
private final IotGatewayProperties.MqttProperties mqttProperties;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
private final IotMqttConnectionManager connectionManager;
@@ -38,12 +35,10 @@ public class IotMqttUpstreamProtocol {
private MqttServer mqttServer;
public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx vertx) {
this.mqttProperties = mqttProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
@@ -54,22 +49,22 @@ public class IotMqttUpstreamProtocol {
@PostConstruct
public void start() {
// 创建服务器选项
MqttServerOptions options = new MqttServerOptions();
options.setPort(mqttProperties.getPort());
options.setMaxMessageSize(mqttProperties.getMaxMessageSize());
options.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds());
MqttServerOptions options = new MqttServerOptions()
.setPort(mqttProperties.getPort())
.setMaxMessageSize(mqttProperties.getMaxMessageSize())
.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds());
// 配置 SSL如果启用
if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) {
options.setSsl(true).setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions())
options.setSsl(true)
.setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions())
.setTrustOptions(mqttProperties.getSslOptions().getTrustOptions());
}
// 创建服务器并设置连接处理器
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, deviceService,
connectionManager);
IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, connectionManager);
handler.handle(endpoint);
});

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager;
import cn.hutool.core.util.StrUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.mqtt.MqttEndpoint;
import lombok.Data;
@@ -23,6 +24,11 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class IotMqttConnectionManager {
/**
* 未知地址常量(当获取端点地址失败时使用)
*/
private static final String UNKNOWN_ADDRESS = "unknown";
/**
* 连接信息映射MqttEndpoint -> 连接信息
*/
@@ -35,21 +41,32 @@ public class IotMqttConnectionManager {
/**
* 安全获取 endpoint 地址
* <p>
* 优先从缓存获取地址,缓存为空时再尝试实时获取
*
* @param endpoint MQTT 连接端点
* @return 地址字符串,如果获取失败返回 "unknown"
* @return 地址字符串,获取失败返回 "unknown"
*/
private String getEndpointAddress(MqttEndpoint endpoint) {
try {
if (endpoint != null) {
return endpoint.remoteAddress().toString();
}
} catch (Exception e) {
// 忽略异常,返回默认值
// TODO @haohao这个比较稳定会出现哇
public String getEndpointAddress(MqttEndpoint endpoint) {
String realTimeAddress = UNKNOWN_ADDRESS;
if (endpoint == null) {
return realTimeAddress;
}
// TODO @haohao这个要枚举下么
return "unknown";
// 1. 优先从缓存获取(避免连接关闭时的异常)
ConnectionInfo connectionInfo = connectionMap.get(endpoint);
if (connectionInfo != null && StrUtil.isNotBlank(connectionInfo.getRemoteAddress())) {
return connectionInfo.getRemoteAddress();
}
// 2. 缓存为空时尝试实时获取
try {
realTimeAddress = endpoint.remoteAddress().toString();
} catch (Exception ignored) {
// 连接已关闭,忽略异常
}
return realTimeAddress;
}
/**
@@ -87,8 +104,9 @@ public class IotMqttConnectionManager {
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceEndpointMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, getEndpointAddress(endpoint));
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId,
getEndpointAddress(endpoint));
}
}
@@ -195,6 +213,11 @@ public class IotMqttConnectionManager {
*/
private boolean authenticated;
/**
* 连接地址
*/
private String remoteAddress;
}
}

View File

@@ -12,7 +12,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -40,8 +39,6 @@ public class IotMqttUpstreamHandler {
public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
// TODO @haohao用不到的 deviceService 可以删除哈;
IotDeviceService deviceService,
IotMqttConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
@@ -60,7 +57,7 @@ public class IotMqttUpstreamHandler {
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, getEndpointAddress(endpoint));
clientId, username, connectionManager.getEndpointAddress(endpoint));
// 1. 先进行认证
if (!authenticateDevice(clientId, username, password, endpoint)) {
@@ -71,32 +68,46 @@ public class IotMqttUpstreamHandler {
log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
// TODO @haohao这里是不是少了序号哈
// 设置异常和关闭处理器
// 2. 设置异常和关闭处理器
endpoint.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint));
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, connectionManager.getEndpointAddress(endpoint));
cleanupConnection(endpoint);
});
endpoint.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, getEndpointAddress(endpoint));
cleanupConnection(endpoint);
});
// 设置消息处理器
// 3. 设置消息处理器
endpoint.publishHandler(message -> {
try {
processMessage(clientId, message.topicName(), message.payload().getBytes(), endpoint);
processMessage(clientId, message.topicName(), message.payload().getBytes());
// 根据 QoS 级别发送相应的确认消息
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
// QoS 1: 发送 PUBACK 确认
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
// QoS 2: 发送 PUBREC 确认
endpoint.publishReceived(message.messageId());
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, getEndpointAddress(endpoint), e.getMessage());
clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage());
cleanupConnection(endpoint);
endpoint.close();
}
});
// 设置订阅处理器
// 4. 设置订阅处理器
endpoint.subscribeHandler(subscribe -> {
log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, subscribe.topicSubscriptions());
// 提取主题名称列表用于日志显示
List<String> topicNames = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::topicName)
.collect(java.util.stream.Collectors.toList());
log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames);
// 提取 QoS 列表
List<MqttQoS> grantedQoSLevels = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
@@ -104,19 +115,22 @@ public class IotMqttUpstreamHandler {
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
});
// 设置取消订阅处理器
// 5. 设置取消订阅处理器
endpoint.unsubscribeHandler(unsubscribe -> {
log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics());
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
// 设置断开连接处理器
// 6. 设置 QoS 2消息的 PUBREL 处理器
endpoint.publishReleaseHandler(endpoint::publishComplete);
// 7. 设置断开连接处理器
endpoint.disconnectHandler(v -> {
log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId);
cleanupConnection(endpoint);
});
// 接受连接
// 8. 接受连接
endpoint.accept(false);
}
@@ -126,10 +140,8 @@ public class IotMqttUpstreamHandler {
* @param clientId 客户端 ID
* @param topic 主题
* @param payload 消息内容
* @param endpoint MQTT 连接端点
* @throws Exception 消息解码失败时抛出异常
*/
private void processMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) throws Exception {
private void processMessage(String clientId, String topic, byte[] payload) {
// 1. 基础检查
if (payload == null || payload.length == 0) {
return;
@@ -146,14 +158,22 @@ public class IotMqttUpstreamHandler {
String deviceName = topicParts[3];
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
try {
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
// 4. 处理业务消息(认证已在连接时完成)
handleBusinessRequest(clientId, message, productKey, deviceName, endpoint);
log.info("[processMessage][收到设备消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
// 4. 处理业务消息(认证已在连接时完成)
handleBusinessRequest(message, productKey, deviceName);
} catch (Exception e) {
log.error("[processMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
@@ -194,9 +214,10 @@ public class IotMqttUpstreamHandler {
return false;
}
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO();
getReqDTO.setProductKey(deviceInfo.getProductKey());
getReqDTO.setDeviceName(deviceInfo.getDeviceName());
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO()
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName());
CommonResult<IotDeviceRespDTO> deviceResult = deviceApi.getDevice(getReqDTO);
if (!deviceResult.isSuccess() || deviceResult.getData() == null) {
log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]",
@@ -221,8 +242,7 @@ public class IotMqttUpstreamHandler {
/**
* 处理业务请求
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, String productKey, String deviceName,
MqttEndpoint endpoint) {
private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) {
// 发送消息到消息总线
message.setServerId(serverId);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
@@ -233,12 +253,14 @@ public class IotMqttUpstreamHandler {
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device,
String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo();
connectionInfo.setDeviceId(device.getId());
connectionInfo.setProductKey(device.getProductKey());
connectionInfo.setDeviceName(device.getDeviceName());
connectionInfo.setClientId(clientId);
connectionInfo.setAuthenticated(true);
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
@@ -257,23 +279,6 @@ public class IotMqttUpstreamHandler {
}
}
/**
* 安全获取 endpoint 地址
*
* @param endpoint MQTT 连接端点
* @return 地址字符串,如果获取失败则返回 "unknown"
*/
private String getEndpointAddress(MqttEndpoint endpoint) {
try {
if (endpoint != null) {
return endpoint.remoteAddress().toString();
}
} catch (Exception e) {
// 忽略异常,返回默认值
}
return "unknown";
}
/**
* 清理连接
*/