From 7138cab3c0a172c1b8f64d8f848a3d01e053d59c Mon Sep 17 00:00:00 2001 From: haohao <1036606149@qq.com> Date: Sat, 5 Jul 2025 23:44:00 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E6=9B=B4=E6=96=B0=20IotGatewayProperties=20?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gateway/config/IotGatewayProperties.java | 99 +++++++++++++++++-- .../emqx/IotEmqxAuthEventProtocol.java | 1 + .../emqx/IotEmqxUpstreamProtocol.java | 50 +++++++++- .../protocol/tcp/IotTcpUpstreamProtocol.java | 2 +- .../src/main/resources/application.yaml | 21 +++- 5 files changed, 160 insertions(+), 13 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index ad7e69b911..13635e72ad 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -123,16 +123,11 @@ public class IotGatewayProperties { */ @NotNull(message = "是否开启不能为空") private Boolean enabled; - // TODO @haohao:加个默认值? + /** - * 服务端口 + * 服务端口(默认:8093) */ - private Integer serverPort; - // TODO @haohao:应该不用?一般都监听 0.0.0.0 哈; - /** - * 服务主机 - */ - private String serverHost; + private Integer serverPort = 8093; } @@ -211,6 +206,94 @@ public class IotGatewayProperties { */ private Long reconnectDelayMs = 5000L; + /** + * 是否启用 Clean Session (清理会话) + * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。 + * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 + */ + private Boolean cleanSession = true; + + /** + * 心跳间隔(秒) + * 用于保持连接活性,及时发现网络中断。 + */ + private Integer keepAliveIntervalSeconds = 60; + + /** + * 最大未确认消息队列大小 + * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 + */ + private Integer maxInflightQueue = 10000; + + /** + * 是否信任所有 SSL 证书 + * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! + * 在生产环境中,应设置为 false,并配置正确的信任库。 + */ + private Boolean trustAll = false; + + /** + * 遗嘱消息配置 (用于网关异常下线时通知其他系统) + */ + private final Will will = new Will(); + + /** + * 高级 SSL/TLS 配置 (用于生产环境) + */ + private final Ssl sslOptions = new Ssl(); + + /** + * 遗嘱消息 (Last Will and Testament) + */ + @Data + public static class Will { + /** + * 是否启用遗嘱消息 + */ + private boolean enabled = false; + /** + * 遗嘱消息主题 + */ + private String topic; + /** + * 遗嘱消息内容 + */ + private String payload; + /** + * 遗嘱消息 QoS 等级 + */ + private Integer qos = 1; + /** + * 遗嘱消息是否作为保留消息发布 + */ + private boolean retain = true; + } + + /** + * 高级 SSL/TLS 配置 + */ + @Data + public static class Ssl { + /** + * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks + * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 + */ + private String keyStorePath; + /** + * 密钥库密码 + */ + private String keyStorePassword; + /** + * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks + * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 + */ + private String trustStorePath; + /** + * 信任库密码 + */ + private String trustStorePassword; + } + } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java index a44d9fb9df..ce10cf76d9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java @@ -71,6 +71,7 @@ public class IotEmqxAuthEventProtocol { router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth); router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent); // TODO @haohao:/mqtt/acl 需要处理么? + // TODO @芋艿:已在 EMQX 处理,如果是“设备直连”模式需要处理 // 3. 启动 HTTP 服务器 try { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index dee9cc083d..48ea281712 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -1,11 +1,14 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.JksOptions; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import jakarta.annotation.PostConstruct; @@ -127,7 +130,6 @@ public class IotEmqxUpstreamProtocol { // 1. 连接 MQTT Broker CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(false); - // TODO @haohao:要不要加 MqttClientOptions 参数?1)setCleanSession true;2)setMaxInflightQueue 10000;3)setKeepAliveInterval 60;4)setSsl/setTrustAll mqttClient.connect(port, host, connectResult -> { if (connectResult.succeeded()) { log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); @@ -252,11 +254,53 @@ public class IotEmqxUpstreamProtocol { * 创建 MQTT 客户端 */ private void createMqttClient() { - MqttClientOptions options = new MqttClientOptions() + // 1. 创建基础配置 + MqttClientOptions options = (MqttClientOptions) new MqttClientOptions() .setClientId(emqxProperties.getMqttClientId()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) - .setSsl(emqxProperties.getMqttSsl()); + .setSsl(emqxProperties.getMqttSsl()) + .setCleanSession(emqxProperties.getCleanSession()) + .setKeepAliveInterval(emqxProperties.getKeepAliveIntervalSeconds()) + .setMaxInflightQueue(emqxProperties.getMaxInflightQueue()) + .setConnectTimeout(emqxProperties.getConnectTimeoutSeconds() * 1000) // Vert.x 需要毫秒 + .setTrustAll(emqxProperties.getTrustAll()); + + // 2. 配置遗嘱消息 + IotGatewayProperties.EmqxProperties.Will will = emqxProperties.getWill(); + if (will.isEnabled()) { + Assert.notBlank(will.getTopic(), "遗嘱消息主题(will.topic)不能为空"); + Assert.notNull(will.getPayload(), "遗嘱消息内容(will.payload)不能为空"); + options.setWillFlag(true) + .setWillTopic(will.getTopic()) + .setWillMessageBytes(Buffer.buffer(will.getPayload())) + .setWillQoS(will.getQos()) + .setWillRetain(will.isRetain()); + } + + // 3. 配置高级 SSL/TLS (仅在启用 SSL 且不信任所有证书时生效) + if (Boolean.TRUE.equals(emqxProperties.getMqttSsl()) && !Boolean.TRUE.equals(emqxProperties.getTrustAll())) { + IotGatewayProperties.EmqxProperties.Ssl sslOptions = emqxProperties.getSslOptions(); + // 配置信任库 (用于验证服务端证书) + if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) { + options.setTrustStoreOptions(new JksOptions() + .setPath(sslOptions.getTrustStorePath()) + .setPassword(sslOptions.getTrustStorePassword())); + } + // 配置密钥库 (用于客户端双向认证) + if (StrUtil.isNotBlank(sslOptions.getKeyStorePath())) { + options.setKeyStoreOptions(new JksOptions() + .setPath(sslOptions.getKeyStorePath()) + .setPassword(sslOptions.getKeyStorePassword())); + } + } + + // 4. 安全警告日志 + if (Boolean.TRUE.equals(emqxProperties.getTrustAll())) { + log.warn("[createMqttClient][安全警告:当前配置信任所有 SSL 证书(trustAll=true),这在生产环境中存在严重安全风险!]"); + } + + // 5. 创建客户端实例 this.mqttClient = MqttClient.create(vertx, options); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java index f6bee94b5a..838e2461ef 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java @@ -54,7 +54,7 @@ public class IotTcpUpstreamProtocol { }); // 3. 启动 TCP 服务器 - netServer.listen(tcpProperties.getServerPort(), tcpProperties.getServerHost()) + netServer.listen(tcpProperties.getServerPort(), "0.0.0.0") .onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort())) .onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e)); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index e845365996..f50edd0eeb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -55,13 +55,32 @@ yudao: mqtt-ssl: false # 是否开启 SSL mqtt-topics: - "/sys/#" # 系统主题 + clean-session: true # 是否启用 Clean Session (默认: true) + keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60) + max-inflight-queue: 10000 # 最大飞行消息队列,单位:条 + connect-timeout-seconds: 10 # 连接超时,单位:秒 + # 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false! + # 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true + trust-all: true # 在 dev 环境可以设为 true + # 遗嘱消息配置 (用于网关异常下线时通知其他系统) + will: + enabled: true # 生产环境强烈建议开启 + topic: "gateway/status/${yudao.iot.gateway.emqx.mqtt-client-id}" # 遗嘱消息主题 + payload: "offline" # 遗嘱消息负载 + qos: 1 # 遗嘱消息 QoS + retain: true # 遗嘱消息是否保留 + # 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效) + ssl-options: + key-store-path: "classpath:certs/client.jks" # 客户端证书库路径 + key-store-password: "your-keystore-password" # 客户端证书库密码 + trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 + trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 # ==================================== # 针对引入的 TCP 组件的配置 # ==================================== tcp: enabled: true server-port: 8093 - server-host: 0.0.0.0 --- #################### 日志相关配置 ####################