From 19cf311b7e476076333ab8981afd093445e5213c Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 14 Jun 2025 20:53:29 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E5=A2=9E=E5=8A=A0=20redis=20+=20event-bus=20?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...YudaoRedisMQConsumerAutoConfiguration.java | 7 +- .../job/RedisPendingMessageResendJob.java | 7 +- .../yudao-module-iot-core/pom.xml | 5 + .../IotMessageBusAutoConfiguration.java | 40 +++++--- ...alIotMessage.java => IotLocalMessage.java} | 2 +- ...essageBus.java => IotLocalMessageBus.java} | 6 +- .../core/redis/IotRedisMessageBus.java | 92 +++++++++++++++++++ ...ageBus.java => IotRocketMQMessageBus.java} | 35 ++++--- .../LocalIotMessageBusIntegrationTest.java | 2 +- .../rocketmq/RocketMQIotMessageBusTest.java | 2 +- .../src/main/resources/application-local.yaml | 4 +- .../src/main/resources/application.yaml | 4 +- .../src/main/resources/application.yaml | 2 +- 13 files changed, 160 insertions(+), 48 deletions(-) rename yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/{LocalIotMessage.java => IotLocalMessage.java} (86%) rename yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/{LocalIotMessageBus.java => IotLocalMessageBus.java} (92%) create mode 100644 yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java rename yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/{RocketMQIotMessageBus.java => IotRocketMQMessageBus.java} (98%) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java index c9ab3e5415..b80244456d 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java @@ -69,9 +69,8 @@ public class YudaoRedisMQConsumerAutoConfiguration { @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, - @Value("${spring.application.name}") String groupName, RedissonClient redissonClient) { - return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); + return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient); } /** @@ -141,14 +140,14 @@ public class YudaoRedisMQConsumerAutoConfiguration { * * @return 消费者名字 */ - private static String buildConsumerName() { + public static String buildConsumerName() { return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); } /** * 校验 Redis 版本号,是否满足最低的版本号要求! */ - private static void checkRedisVersion(RedisTemplate redisTemplate) { + public static void checkRedisVersion(RedisTemplate redisTemplate) { // 获得 Redis 版本 Properties info = redisTemplate.execute((RedisCallback) RedisServerCommands::info); String version = MapUtil.getStr(info, "redis_version"); diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java index cb4e3991f1..bb16be0eeb 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java @@ -35,7 +35,6 @@ public class RedisPendingMessageResendJob { private final List> listeners; private final RedisMQTemplate redisTemplate; - private final String groupName; private final RedissonClient redissonClient; /** @@ -64,13 +63,13 @@ public class RedisPendingMessageResendJob { private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); listeners.forEach(listener -> { - PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName)); + PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), listener.getGroup())); // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); // 每个消费者的 pending消息的详情信息 - PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount); + PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(listener.getGroup(), consumerName), Range.unbounded(), pendingMessageCount); if (pendingMessages.isEmpty()) { return; } @@ -91,7 +90,7 @@ public class RedisPendingMessageResendJob { .ofObject(records.get(0).getValue()) // 设置内容 .withStreamKey(listener.getStreamKey())); // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0)); + redisTemplate.getRedisTemplate().opsForStream().acknowledge(listener.getGroup(), records.get(0)); log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId()); }); }); diff --git a/yudao-module-iot/yudao-module-iot-core/pom.xml b/yudao-module-iot/yudao-module-iot-core/pom.xml index 3f4bb1f126..30ebc2de0c 100644 --- a/yudao-module-iot/yudao-module-iot-core/pom.xml +++ b/yudao-module-iot/yudao-module-iot-core/pom.xml @@ -32,6 +32,11 @@ + + cn.iocoder.boot + yudao-spring-boot-starter-mq + + org.springframework.data spring-data-redis diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java index 6505058e77..4a5aaff57a 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java @@ -1,8 +1,9 @@ package cn.iocoder.yudao.module.iot.core.messagebus.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; -import cn.iocoder.yudao.module.iot.core.messagebus.core.local.LocalIotMessageBus; -import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.RocketMQIotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.IotRocketMQMessageBus; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; @@ -14,6 +15,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; /** * IoT 消息总线自动配置 @@ -34,12 +37,12 @@ public class IotMessageBusAutoConfiguration { @Configuration @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true) - public static class LocalIotMessageBusConfiguration { + public static class IotLocalMessageBusConfiguration { @Bean - public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) { - log.info("[localIotMessageBus][创建 Local IoT 消息总线]"); - return new LocalIotMessageBus(applicationContext); + public IotMessageBus iotLocalMessageBus(ApplicationContext applicationContext) { + log.info("[iotLocalMessageBus][创建 IoT Local 消息总线]"); + return new IotLocalMessageBus(applicationContext); } } @@ -49,13 +52,28 @@ public class IotMessageBusAutoConfiguration { @Configuration @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq") @ConditionalOnClass(RocketMQTemplate.class) - public static class RocketMQIotMessageBusConfiguration { + public static class IotRocketMQMessageBusConfiguration { @Bean - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public IotMessageBus rocketMQIotMessageBus(RocketMQProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) { - log.info("[rocketMQIotMessageBus][创建 RocketMQ IoT 消息总线]"); - return new RocketMQIotMessageBus(rocketMQProperties, rocketMQTemplate); + public IotMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties, + RocketMQTemplate rocketMQTemplate) { + log.info("[iotRocketMQMessageBus][创建 IoT RocketMQ 消息总线]"); + return new IotRocketMQMessageBus(rocketMQProperties, rocketMQTemplate); + } + + } + + // ==================== Redis 实现 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "redis") + @ConditionalOnClass(RedisTemplate.class) + public static class IotRedisMessageBusConfiguration { + + @Bean + public IotMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) { + log.info("[iotRedisMessageBus][创建 IoT Redis 消息总线]"); + return new IotRedisMessageBus(redisTemplate); } } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessage.java similarity index 86% rename from yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessage.java rename to yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessage.java index c8c727792a..5a9841a754 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessage.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessage.java @@ -5,7 +5,7 @@ import lombok.Data; @Data @AllArgsConstructor -public class LocalIotMessage { +public class IotLocalMessage { private String topic; diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessageBus.java similarity index 92% rename from yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBus.java rename to yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessageBus.java index 76bd6a493e..1fc608bc50 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/IotLocalMessageBus.java @@ -22,7 +22,7 @@ import java.util.Map; */ @RequiredArgsConstructor @Slf4j -public class LocalIotMessageBus implements IotMessageBus { +public class IotLocalMessageBus implements IotMessageBus { private final ApplicationContext applicationContext; @@ -34,7 +34,7 @@ public class LocalIotMessageBus implements IotMessageBus { @Override public void post(String topic, Object message) { - applicationContext.publishEvent(new LocalIotMessage(topic, message)); + applicationContext.publishEvent(new IotLocalMessage(topic, message)); } @Override @@ -48,7 +48,7 @@ public class LocalIotMessageBus implements IotMessageBus { @EventListener @SuppressWarnings({"unchecked", "rawtypes"}) - public void onMessage(LocalIotMessage message) { + public void onMessage(IotLocalMessage message) { String topic = message.getTopic(); List> topicSubscribers = subscribers.get(topic); if (CollUtil.isEmpty(topicSubscribers)) { diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java new file mode 100644 index 0000000000..5736345fc7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java @@ -0,0 +1,92 @@ +package cn.iocoder.yudao.module.iot.core.messagebus.core.redis; + +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; + +import java.lang.reflect.Type; + +import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.buildConsumerName; +import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.checkRedisVersion; + +/** + * Redis 的 {@link IotMessageBus} 实现类 + * + * @author 芋道源码 + */ +@Slf4j +public class IotRedisMessageBus implements IotMessageBus { + + private final RedisTemplate redisTemplate; + + private final StreamMessageListenerContainer> redisStreamMessageListenerContainer; + + public IotRedisMessageBus(RedisTemplate redisTemplate) { + this.redisTemplate = redisTemplate; + checkRedisVersion(redisTemplate); + // 创建 options 配置 + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() + .batchSize(10) // 一次性最多拉取多少条消息 + .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 + .build(); + // 创建 container 对象 + this.redisStreamMessageListenerContainer = + StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); + } + + @PostConstruct + public void init() { + this.redisStreamMessageListenerContainer.start(); + } + + @PreDestroy + public void destroy() { + this.redisStreamMessageListenerContainer.stop(); + } + + @Override + public void post(String topic, Object message) { + redisTemplate.opsForStream().add(StreamRecords.newRecord() + .ofObject(JsonUtils.toJsonString(message)) // 设置内容 + .withStreamKey(topic)); // 设置 stream key + } + + @Override + public void register(IotMessageSubscriber subscriber) { + Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0); + if (type == null) { + throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); + } + + // 创建 listener 对应的消费者分组 + try { + redisTemplate.opsForStream().createGroup(subscriber.getTopic(), subscriber.getGroup()); + } catch (Exception ignore) { + } + // 创建 Consumer 对象 + String consumerName = buildConsumerName(); + Consumer consumer = Consumer.from(subscriber.getGroup(), consumerName); + // 设置 Consumer 消费进度,以最小消费进度为准 + StreamOffset streamOffset = StreamOffset.create(subscriber.getTopic(), ReadOffset.lastConsumed()); + // 设置 Consumer 监听 + StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest + .builder(streamOffset).consumer(consumer) + .autoAcknowledge(false) // 不自动 ack + .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false + redisStreamMessageListenerContainer.register(builder.build(), message -> { + // 消费消息 + subscriber.onMessage(JsonUtils.parseObject(message.getValue(), type)); + // ack 消息消费完成 + redisTemplate.opsForStream().acknowledge(subscriber.getGroup(), message); + }); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/IotRocketMQMessageBus.java similarity index 98% rename from yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java rename to yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/IotRocketMQMessageBus.java index 5d6d72af1c..48218b2519 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/IotRocketMQMessageBus.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -15,8 +16,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; -import jakarta.annotation.PreDestroy; - import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; @@ -28,7 +27,7 @@ import java.util.List; */ @RequiredArgsConstructor @Slf4j -public class RocketMQIotMessageBus implements IotMessageBus { +public class IotRocketMQMessageBus implements IotMessageBus { private final RocketMQProperties rocketMQProperties; @@ -39,6 +38,21 @@ public class RocketMQIotMessageBus implements IotMessageBus { */ private final List topicConsumers = new ArrayList<>(); + /** + * 销毁时关闭所有消费者 + */ + @PreDestroy + public void destroy() { + for (DefaultMQPushConsumer consumer : topicConsumers) { + try { + consumer.shutdown(); + log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup()); + } catch (Exception e) { + log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e); + } + } + } + @Override public void post(String topic, Object message) { // TODO @芋艿:需要 orderly! @@ -81,19 +95,4 @@ public class RocketMQIotMessageBus implements IotMessageBus { topicConsumers.add(consumer); } - /** - * 销毁时关闭所有消费者 - */ - @PreDestroy - public void destroy() { - for (DefaultMQPushConsumer consumer : topicConsumers) { - try { - consumer.shutdown(); - log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup()); - } catch (Exception e) { - log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e); - } - } - } - } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBusIntegrationTest.java b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBusIntegrationTest.java index 341ad891c2..b282bc89ea 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBusIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/local/LocalIotMessageBusIntegrationTest.java @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.*; /** - * {@link LocalIotMessageBus} 集成测试 + * {@link IotLocalMessageBus} 集成测试 * * @author 芋道源码 */ diff --git a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java index 01b97ce780..b7270f2fe0 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java +++ b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.*; /** - * {@link RocketMQIotMessageBus} 集成测试 + * {@link IotRocketMQMessageBus} 集成测试 * * @author 芋道源码 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml index 384799eebf..ab3eda8155 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application-local.yaml @@ -31,7 +31,7 @@ yudao: # 针对引入的 EMQX 组件的配置 # ==================================== emqx: - enabled: true + enabled: false http-port: 8090 # MQTT HTTP 服务端口 mqtt-host: 127.0.0.1 # MQTT Broker 地址 mqtt-port: 1883 # MQTT Broker 端口 @@ -44,7 +44,7 @@ yudao: # 消息总线配置 message-bus: - type: rocketmq # 本地开发使用 RocketMQ + type: redis # 本地开发使用 RocketMQ --- #################### 日志相关配置 #################### diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index ae42202493..b12b2f73d7 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -37,14 +37,14 @@ yudao: # 针对引入的 EMQX 组件的配置 # ==================================== emqx: - enabled: true + enabled: false mqtt-ssl: false mqtt-topics: - "/sys/#" # 系统主题 # 消息总线配置 message-bus: - type: rocketmq # 消息总线的类型 + type: redis # 消息总线的类型 --- #################### 日志相关配置 #################### diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index 1fcb2df444..025fbbee8b 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -313,6 +313,6 @@ yudao: customer: E77DF18BE109F454A5CD319E44BF5177 iot: message-bus: - type: rocketmq # 消息总线的类型 + type: redis # 消息总线的类型 debug: false \ No newline at end of file