From 05ac902dc925f78193afe15998867c276977bc3b Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 14 Jun 2025 21:58:26 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E5=A2=9E=E5=8A=A0=20redis=20+=20event-bus=20?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0=EF=BC=88=E5=A2=9E=E5=8A=A0=20job=20?= =?UTF-8?q?=E6=B8=85=E7=90=86=E8=83=BD=E5=8A=9B=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractRedisStreamMessageListener.java | 6 ++ .../rule/IotRuleSceneMessageHandler.java | 31 ++++++++-- .../IotMessageBusAutoConfiguration.java | 56 +++++++++++++++++-- .../core/redis/IotRedisMessageBus.java | 7 +++ 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java index 3e656af3f0..ba1aa96977 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java @@ -53,6 +53,12 @@ public abstract class AbstractRedisStreamMessageListener message) { // 消费消息 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java index d7deccef43..38bc3423b3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java @@ -1,11 +1,12 @@ package cn.iocoder.yudao.module.iot.mq.consumer.rule; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService; +import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; // TODO @puhui999:后面重构哈 @@ -16,14 +17,34 @@ import org.springframework.stereotype.Component; */ @Component @Slf4j -public class IotRuleSceneMessageHandler { +public class IotRuleSceneMessageHandler implements IotMessageSubscriber { @Resource private IotRuleSceneService ruleSceneService; - @EventListener - @Async + @Resource + private IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC; + } + + @Override + public String getGroup() { + return "iot_rule_consumer"; + } + + @Override public void onMessage(IotDeviceMessage message) { + if (true) { + return; + } log.info("[onMessage][消息内容({})]", message); ruleSceneService.executeRuleSceneByDevice(message); } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java index 4a5aaff57a..67ae67399c 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java @@ -1,5 +1,10 @@ package cn.iocoder.yudao.module.iot.core.messagebus.config; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.job.RedisStreamMessageCleanupJob; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus; @@ -8,6 +13,7 @@ import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.redisson.api.RedissonClient; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -18,6 +24,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import java.util.List; + +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; + /** * IoT 消息总线自动配置 * @@ -40,7 +50,7 @@ public class IotMessageBusAutoConfiguration { public static class IotLocalMessageBusConfiguration { @Bean - public IotMessageBus iotLocalMessageBus(ApplicationContext applicationContext) { + public IotLocalMessageBus iotLocalMessageBus(ApplicationContext applicationContext) { log.info("[iotLocalMessageBus][创建 IoT Local 消息总线]"); return new IotLocalMessageBus(applicationContext); } @@ -55,8 +65,8 @@ public class IotMessageBusAutoConfiguration { public static class IotRocketMQMessageBusConfiguration { @Bean - public IotMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties, - RocketMQTemplate rocketMQTemplate) { + public IotRocketMQMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties, + RocketMQTemplate rocketMQTemplate) { log.info("[iotRocketMQMessageBus][创建 IoT RocketMQ 消息总线]"); return new IotRocketMQMessageBus(rocketMQProperties, rocketMQTemplate); } @@ -65,17 +75,55 @@ public class IotMessageBusAutoConfiguration { // ==================== Redis 实现 ==================== + /** + * 特殊:由于 YudaoRedisMQConsumerAutoConfiguration 关于 Redis stream 的消费是动态注册,所以这里只能拷贝相关的逻辑!!! + * + * @see cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration + */ @Configuration @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "redis") @ConditionalOnClass(RedisTemplate.class) public static class IotRedisMessageBusConfiguration { @Bean - public IotMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) { + public IotRedisMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) { log.info("[iotRedisMessageBus][创建 IoT Redis 消息总线]"); return new IotRedisMessageBus(redisTemplate); } + /** + * 创建 Redis Stream 重新消费的任务 + */ + @Bean + public RedisPendingMessageResendJob iotRedisPendingMessageResendJob(IotRedisMessageBus messageBus, + RedisMQTemplate redisTemplate, + RedissonClient redissonClient) { + List> listeners = getListeners(messageBus); + return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient); + } + + /** + * 创建 Redis Stream 消息清理任务 + */ + @Bean + public RedisStreamMessageCleanupJob iotRedisStreamMessageCleanupJob(IotRedisMessageBus messageBus, + RedisMQTemplate redisTemplate, + RedissonClient redissonClient) { + List> listeners = getListeners(messageBus); + return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient); + } + + private List> getListeners(IotRedisMessageBus messageBus) { + return convertList(messageBus.getSubscribers(), subscriber -> + new AbstractRedisStreamMessageListener<>(subscriber.getTopic(), subscriber.getGroup()) { + + @Override + public void onMessage(AbstractRedisStreamMessage message) { + throw new UnsupportedOperationException("不应该调用!!!"); + } + }); + } + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java index 5736345fc7..fcaed5a87b 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java @@ -6,12 +6,15 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.buildConsumerName; import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.checkRedisVersion; @@ -28,6 +31,9 @@ public class IotRedisMessageBus implements IotMessageBus { private final StreamMessageListenerContainer> redisStreamMessageListenerContainer; + @Getter + private final List> subscribers = new ArrayList<>(); + public IotRedisMessageBus(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; checkRedisVersion(redisTemplate); @@ -87,6 +93,7 @@ public class IotRedisMessageBus implements IotMessageBus { // ack 消息消费完成 redisTemplate.opsForStream().acknowledge(subscriber.getGroup(), message); }); + this.subscribers.add(subscriber); } }