feat:【IoT 物联网】更新 IotGatewayProperties 配置
This commit is contained in:
@@ -123,16 +123,11 @@ public class IotGatewayProperties {
|
|||||||
*/
|
*/
|
||||||
@NotNull(message = "是否开启不能为空")
|
@NotNull(message = "是否开启不能为空")
|
||||||
private Boolean enabled;
|
private Boolean enabled;
|
||||||
// TODO @haohao:加个默认值?
|
|
||||||
/**
|
/**
|
||||||
* 服务端口
|
* 服务端口(默认:8093)
|
||||||
*/
|
*/
|
||||||
private Integer serverPort;
|
private Integer serverPort = 8093;
|
||||||
// TODO @haohao:应该不用?一般都监听 0.0.0.0 哈;
|
|
||||||
/**
|
|
||||||
* 服务主机
|
|
||||||
*/
|
|
||||||
private String serverHost;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -211,6 +206,94 @@ public class IotGatewayProperties {
|
|||||||
*/
|
*/
|
||||||
private Long reconnectDelayMs = 5000L;
|
private Long reconnectDelayMs = 5000L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否启用 Clean Session (清理会话)
|
||||||
|
* true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。
|
||||||
|
* 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。
|
||||||
|
*/
|
||||||
|
private Boolean cleanSession = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 心跳间隔(秒)
|
||||||
|
* 用于保持连接活性,及时发现网络中断。
|
||||||
|
*/
|
||||||
|
private Integer keepAliveIntervalSeconds = 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 最大未确认消息队列大小
|
||||||
|
* 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。
|
||||||
|
*/
|
||||||
|
private Integer maxInflightQueue = 10000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否信任所有 SSL 证书
|
||||||
|
* 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用!
|
||||||
|
* 在生产环境中,应设置为 false,并配置正确的信任库。
|
||||||
|
*/
|
||||||
|
private Boolean trustAll = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 遗嘱消息配置 (用于网关异常下线时通知其他系统)
|
||||||
|
*/
|
||||||
|
private final Will will = new Will();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 高级 SSL/TLS 配置 (用于生产环境)
|
||||||
|
*/
|
||||||
|
private final Ssl sslOptions = new Ssl();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 遗嘱消息 (Last Will and Testament)
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public static class Will {
|
||||||
|
/**
|
||||||
|
* 是否启用遗嘱消息
|
||||||
|
*/
|
||||||
|
private boolean enabled = false;
|
||||||
|
/**
|
||||||
|
* 遗嘱消息主题
|
||||||
|
*/
|
||||||
|
private String topic;
|
||||||
|
/**
|
||||||
|
* 遗嘱消息内容
|
||||||
|
*/
|
||||||
|
private String payload;
|
||||||
|
/**
|
||||||
|
* 遗嘱消息 QoS 等级
|
||||||
|
*/
|
||||||
|
private Integer qos = 1;
|
||||||
|
/**
|
||||||
|
* 遗嘱消息是否作为保留消息发布
|
||||||
|
*/
|
||||||
|
private boolean retain = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 高级 SSL/TLS 配置
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public static class Ssl {
|
||||||
|
/**
|
||||||
|
* 密钥库(KeyStore)路径,例如:classpath:certs/client.jks
|
||||||
|
* 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。
|
||||||
|
*/
|
||||||
|
private String keyStorePath;
|
||||||
|
/**
|
||||||
|
* 密钥库密码
|
||||||
|
*/
|
||||||
|
private String keyStorePassword;
|
||||||
|
/**
|
||||||
|
* 信任库(TrustStore)路径,例如:classpath:certs/trust.jks
|
||||||
|
* 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。
|
||||||
|
*/
|
||||||
|
private String trustStorePath;
|
||||||
|
/**
|
||||||
|
* 信任库密码
|
||||||
|
*/
|
||||||
|
private String trustStorePassword;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ public class IotEmqxAuthEventProtocol {
|
|||||||
router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth);
|
router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth);
|
||||||
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent);
|
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent);
|
||||||
// TODO @haohao:/mqtt/acl 需要处理么?
|
// TODO @haohao:/mqtt/acl 需要处理么?
|
||||||
|
// TODO @芋艿:已在 EMQX 处理,如果是“设备直连”模式需要处理
|
||||||
|
|
||||||
// 3. 启动 HTTP 服务器
|
// 3. 启动 HTTP 服务器
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
|
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler;
|
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import io.vertx.core.Vertx;
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.core.buffer.Buffer;
|
import io.vertx.core.buffer.Buffer;
|
||||||
|
import io.vertx.core.net.JksOptions;
|
||||||
import io.vertx.mqtt.MqttClient;
|
import io.vertx.mqtt.MqttClient;
|
||||||
import io.vertx.mqtt.MqttClientOptions;
|
import io.vertx.mqtt.MqttClientOptions;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
@@ -127,7 +130,6 @@ public class IotEmqxUpstreamProtocol {
|
|||||||
// 1. 连接 MQTT Broker
|
// 1. 连接 MQTT Broker
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
AtomicBoolean success = new AtomicBoolean(false);
|
AtomicBoolean success = new AtomicBoolean(false);
|
||||||
// TODO @haohao:要不要加 MqttClientOptions 参数?1)setCleanSession true;2)setMaxInflightQueue 10000;3)setKeepAliveInterval 60;4)setSsl/setTrustAll
|
|
||||||
mqttClient.connect(port, host, connectResult -> {
|
mqttClient.connect(port, host, connectResult -> {
|
||||||
if (connectResult.succeeded()) {
|
if (connectResult.succeeded()) {
|
||||||
log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
|
log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port);
|
||||||
@@ -252,11 +254,53 @@ public class IotEmqxUpstreamProtocol {
|
|||||||
* 创建 MQTT 客户端
|
* 创建 MQTT 客户端
|
||||||
*/
|
*/
|
||||||
private void createMqttClient() {
|
private void createMqttClient() {
|
||||||
MqttClientOptions options = new MqttClientOptions()
|
// 1. 创建基础配置
|
||||||
|
MqttClientOptions options = (MqttClientOptions) new MqttClientOptions()
|
||||||
.setClientId(emqxProperties.getMqttClientId())
|
.setClientId(emqxProperties.getMqttClientId())
|
||||||
.setUsername(emqxProperties.getMqttUsername())
|
.setUsername(emqxProperties.getMqttUsername())
|
||||||
.setPassword(emqxProperties.getMqttPassword())
|
.setPassword(emqxProperties.getMqttPassword())
|
||||||
.setSsl(emqxProperties.getMqttSsl());
|
.setSsl(emqxProperties.getMqttSsl())
|
||||||
|
.setCleanSession(emqxProperties.getCleanSession())
|
||||||
|
.setKeepAliveInterval(emqxProperties.getKeepAliveIntervalSeconds())
|
||||||
|
.setMaxInflightQueue(emqxProperties.getMaxInflightQueue())
|
||||||
|
.setConnectTimeout(emqxProperties.getConnectTimeoutSeconds() * 1000) // Vert.x 需要毫秒
|
||||||
|
.setTrustAll(emqxProperties.getTrustAll());
|
||||||
|
|
||||||
|
// 2. 配置遗嘱消息
|
||||||
|
IotGatewayProperties.EmqxProperties.Will will = emqxProperties.getWill();
|
||||||
|
if (will.isEnabled()) {
|
||||||
|
Assert.notBlank(will.getTopic(), "遗嘱消息主题(will.topic)不能为空");
|
||||||
|
Assert.notNull(will.getPayload(), "遗嘱消息内容(will.payload)不能为空");
|
||||||
|
options.setWillFlag(true)
|
||||||
|
.setWillTopic(will.getTopic())
|
||||||
|
.setWillMessageBytes(Buffer.buffer(will.getPayload()))
|
||||||
|
.setWillQoS(will.getQos())
|
||||||
|
.setWillRetain(will.isRetain());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 配置高级 SSL/TLS (仅在启用 SSL 且不信任所有证书时生效)
|
||||||
|
if (Boolean.TRUE.equals(emqxProperties.getMqttSsl()) && !Boolean.TRUE.equals(emqxProperties.getTrustAll())) {
|
||||||
|
IotGatewayProperties.EmqxProperties.Ssl sslOptions = emqxProperties.getSslOptions();
|
||||||
|
// 配置信任库 (用于验证服务端证书)
|
||||||
|
if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) {
|
||||||
|
options.setTrustStoreOptions(new JksOptions()
|
||||||
|
.setPath(sslOptions.getTrustStorePath())
|
||||||
|
.setPassword(sslOptions.getTrustStorePassword()));
|
||||||
|
}
|
||||||
|
// 配置密钥库 (用于客户端双向认证)
|
||||||
|
if (StrUtil.isNotBlank(sslOptions.getKeyStorePath())) {
|
||||||
|
options.setKeyStoreOptions(new JksOptions()
|
||||||
|
.setPath(sslOptions.getKeyStorePath())
|
||||||
|
.setPassword(sslOptions.getKeyStorePassword()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 安全警告日志
|
||||||
|
if (Boolean.TRUE.equals(emqxProperties.getTrustAll())) {
|
||||||
|
log.warn("[createMqttClient][安全警告:当前配置信任所有 SSL 证书(trustAll=true),这在生产环境中存在严重安全风险!]");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 创建客户端实例
|
||||||
this.mqttClient = MqttClient.create(vertx, options);
|
this.mqttClient = MqttClient.create(vertx, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ public class IotTcpUpstreamProtocol {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// 3. 启动 TCP 服务器
|
// 3. 启动 TCP 服务器
|
||||||
netServer.listen(tcpProperties.getServerPort(), tcpProperties.getServerHost())
|
netServer.listen(tcpProperties.getServerPort(), "0.0.0.0")
|
||||||
.onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort()))
|
.onSuccess(server -> log.info("[start][IoT 网关 TCP 服务启动成功,端口:{}]", server.actualPort()))
|
||||||
.onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e));
|
.onFailure(e -> log.error("[start][IoT 网关 TCP 服务启动失败]", e));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,13 +55,32 @@ yudao:
|
|||||||
mqtt-ssl: false # 是否开启 SSL
|
mqtt-ssl: false # 是否开启 SSL
|
||||||
mqtt-topics:
|
mqtt-topics:
|
||||||
- "/sys/#" # 系统主题
|
- "/sys/#" # 系统主题
|
||||||
|
clean-session: true # 是否启用 Clean Session (默认: true)
|
||||||
|
keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60)
|
||||||
|
max-inflight-queue: 10000 # 最大飞行消息队列,单位:条
|
||||||
|
connect-timeout-seconds: 10 # 连接超时,单位:秒
|
||||||
|
# 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false!
|
||||||
|
# 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true
|
||||||
|
trust-all: true # 在 dev 环境可以设为 true
|
||||||
|
# 遗嘱消息配置 (用于网关异常下线时通知其他系统)
|
||||||
|
will:
|
||||||
|
enabled: true # 生产环境强烈建议开启
|
||||||
|
topic: "gateway/status/${yudao.iot.gateway.emqx.mqtt-client-id}" # 遗嘱消息主题
|
||||||
|
payload: "offline" # 遗嘱消息负载
|
||||||
|
qos: 1 # 遗嘱消息 QoS
|
||||||
|
retain: true # 遗嘱消息是否保留
|
||||||
|
# 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效)
|
||||||
|
ssl-options:
|
||||||
|
key-store-path: "classpath:certs/client.jks" # 客户端证书库路径
|
||||||
|
key-store-password: "your-keystore-password" # 客户端证书库密码
|
||||||
|
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
|
||||||
|
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
|
||||||
# ====================================
|
# ====================================
|
||||||
# 针对引入的 TCP 组件的配置
|
# 针对引入的 TCP 组件的配置
|
||||||
# ====================================
|
# ====================================
|
||||||
tcp:
|
tcp:
|
||||||
enabled: true
|
enabled: true
|
||||||
server-port: 8093
|
server-port: 8093
|
||||||
server-host: 0.0.0.0
|
|
||||||
|
|
||||||
--- #################### 日志相关配置 ####################
|
--- #################### 日志相关配置 ####################
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user