feat:【IoT 物联网】增加 redis + event-bus 的实现

This commit is contained in:
YunaiV
2025-06-14 20:53:29 +08:00
parent 69e25eeaac
commit 19cf311b7e
13 changed files with 160 additions and 48 deletions

View File

@@ -69,9 +69,8 @@ public class YudaoRedisMQConsumerAutoConfiguration {
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) {
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient);
}
/**
@@ -141,14 +140,14 @@ public class YudaoRedisMQConsumerAutoConfiguration {
*
* @return 消费者名字
*/
private static String buildConsumerName() {
public static String buildConsumerName() {
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
}
/**
* 校验 Redis 版本号,是否满足最低的版本号要求!
*/
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
public static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
String version = MapUtil.getStr(info, "redis_version");

View File

@@ -35,7 +35,6 @@ public class RedisPendingMessageResendJob {
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
/**
@@ -64,13 +63,13 @@ public class RedisPendingMessageResendJob {
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), listener.getGroup()));
// 每个消费者的 pending 队列消息数量
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
// 每个消费者的 pending消息的详情信息
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(listener.getGroup(), consumerName), Range.unbounded(), pendingMessageCount);
if (pendingMessages.isEmpty()) {
return;
}
@@ -91,7 +90,7 @@ public class RedisPendingMessageResendJob {
.ofObject(records.get(0).getValue()) // 设置内容
.withStreamKey(listener.getStreamKey()));
// ack 消息消费完成
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
redisTemplate.getRedisTemplate().opsForStream().acknowledge(listener.getGroup(), records.get(0));
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
});
});

View File

@@ -32,6 +32,11 @@
</dependency>
<!-- 消息中间件相关(可选依赖) -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>

View File

@@ -1,8 +1,9 @@
package cn.iocoder.yudao.module.iot.core.messagebus.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.local.LocalIotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.RocketMQIotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.IotRocketMQMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
@@ -14,6 +15,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* IoT 消息总线自动配置
@@ -34,12 +37,12 @@ public class IotMessageBusAutoConfiguration {
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true)
public static class LocalIotMessageBusConfiguration {
public static class IotLocalMessageBusConfiguration {
@Bean
public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) {
log.info("[localIotMessageBus][创建 Local IoT 消息总线]");
return new LocalIotMessageBus(applicationContext);
public IotMessageBus iotLocalMessageBus(ApplicationContext applicationContext) {
log.info("[iotLocalMessageBus][创建 IoT Local 消息总线]");
return new IotLocalMessageBus(applicationContext);
}
}
@@ -49,13 +52,28 @@ public class IotMessageBusAutoConfiguration {
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq")
@ConditionalOnClass(RocketMQTemplate.class)
public static class RocketMQIotMessageBusConfiguration {
public static class IotRocketMQMessageBusConfiguration {
@Bean
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public IotMessageBus rocketMQIotMessageBus(RocketMQProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) {
log.info("[rocketMQIotMessageBus][创建 RocketMQ IoT 消息总线]");
return new RocketMQIotMessageBus(rocketMQProperties, rocketMQTemplate);
public IotMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties,
RocketMQTemplate rocketMQTemplate) {
log.info("[iotRocketMQMessageBus][创建 IoT RocketMQ 消息总线]");
return new IotRocketMQMessageBus(rocketMQProperties, rocketMQTemplate);
}
}
// ==================== Redis 实现 ====================
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "redis")
@ConditionalOnClass(RedisTemplate.class)
public static class IotRedisMessageBusConfiguration {
@Bean
public IotMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) {
log.info("[iotRedisMessageBus][创建 IoT Redis 消息总线]");
return new IotRedisMessageBus(redisTemplate);
}
}

View File

@@ -22,7 +22,7 @@ import java.util.Map;
*/
@RequiredArgsConstructor
@Slf4j
public class LocalIotMessageBus implements IotMessageBus {
public class IotLocalMessageBus implements IotMessageBus {
private final ApplicationContext applicationContext;
@@ -34,7 +34,7 @@ public class LocalIotMessageBus implements IotMessageBus {
@Override
public void post(String topic, Object message) {
applicationContext.publishEvent(new LocalIotMessage(topic, message));
applicationContext.publishEvent(new IotLocalMessage(topic, message));
}
@Override
@@ -48,7 +48,7 @@ public class LocalIotMessageBus implements IotMessageBus {
@EventListener
@SuppressWarnings({"unchecked", "rawtypes"})
public void onMessage(LocalIotMessage message) {
public void onMessage(IotLocalMessage message) {
String topic = message.getTopic();
List<IotMessageSubscriber<?>> topicSubscribers = subscribers.get(topic);
if (CollUtil.isEmpty(topicSubscribers)) {

View File

@@ -0,0 +1,92 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.redis;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.lang.reflect.Type;
import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.buildConsumerName;
import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.checkRedisVersion;
/**
* Redis 的 {@link IotMessageBus} 实现类
*
* @author 芋道源码
*/
@Slf4j
public class IotRedisMessageBus implements IotMessageBus {
private final RedisTemplate<String, ?> redisTemplate;
private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer;
public IotRedisMessageBus(RedisTemplate<String, ?> redisTemplate) {
this.redisTemplate = redisTemplate;
checkRedisVersion(redisTemplate);
// 创建 options 配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10) // 一次性最多拉取多少条消息
.targetType(String.class) // 目标类型。统一使用 String通过自己封装的 AbstractStreamMessageListener 去反序列化
.build();
// 创建 container 对象
this.redisStreamMessageListenerContainer =
StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
}
@PostConstruct
public void init() {
this.redisStreamMessageListenerContainer.start();
}
@PreDestroy
public void destroy() {
this.redisStreamMessageListenerContainer.stop();
}
@Override
public void post(String topic, Object message) {
redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(topic)); // 设置 stream key
}
@Override
public void register(IotMessageSubscriber<?> subscriber) {
Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
// 创建 listener 对应的消费者分组
try {
redisTemplate.opsForStream().createGroup(subscriber.getTopic(), subscriber.getGroup());
} catch (Exception ignore) {
}
// 创建 Consumer 对象
String consumerName = buildConsumerName();
Consumer consumer = Consumer.from(subscriber.getGroup(), consumerName);
// 设置 Consumer 消费进度,以最小消费进度为准
StreamOffset<String> streamOffset = StreamOffset.create(subscriber.getTopic(), ReadOffset.lastConsumed());
// 设置 Consumer 监听
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset).consumer(consumer)
.autoAcknowledge(false) // 不自动 ack
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
redisStreamMessageListenerContainer.register(builder.build(), message -> {
// 消费消息
subscriber.onMessage(JsonUtils.parseObject(message.getValue(), type));
// ack 消息消费完成
redisTemplate.opsForStream().acknowledge(subscriber.getGroup(), message);
});
}
}

View File

@@ -4,6 +4,7 @@ import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -15,8 +16,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import jakarta.annotation.PreDestroy;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -28,7 +27,7 @@ import java.util.List;
*/
@RequiredArgsConstructor
@Slf4j
public class RocketMQIotMessageBus implements IotMessageBus {
public class IotRocketMQMessageBus implements IotMessageBus {
private final RocketMQProperties rocketMQProperties;
@@ -39,6 +38,21 @@ public class RocketMQIotMessageBus implements IotMessageBus {
*/
private final List<DefaultMQPushConsumer> topicConsumers = new ArrayList<>();
/**
* 销毁时关闭所有消费者
*/
@PreDestroy
public void destroy() {
for (DefaultMQPushConsumer consumer : topicConsumers) {
try {
consumer.shutdown();
log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup());
} catch (Exception e) {
log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e);
}
}
}
@Override
public void post(String topic, Object message) {
// TODO @芋艿需要 orderly
@@ -81,19 +95,4 @@ public class RocketMQIotMessageBus implements IotMessageBus {
topicConsumers.add(consumer);
}
/**
* 销毁时关闭所有消费者
*/
@PreDestroy
public void destroy() {
for (DefaultMQPushConsumer consumer : topicConsumers) {
try {
consumer.shutdown();
log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup());
} catch (Exception e) {
log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e);
}
}
}
}

View File

@@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link LocalIotMessageBus} 集成测试
* {@link IotLocalMessageBus} 集成测试
*
* @author 芋道源码
*/

View File

@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link RocketMQIotMessageBus} 集成测试
* {@link IotRocketMQMessageBus} 集成测试
*
* @author 芋道源码
*/

View File

@@ -31,7 +31,7 @@ yudao:
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: true
enabled: false
http-port: 8090 # MQTT HTTP 服务端口
mqtt-host: 127.0.0.1 # MQTT Broker 地址
mqtt-port: 1883 # MQTT Broker 端口
@@ -44,7 +44,7 @@ yudao:
# 消息总线配置
message-bus:
type: rocketmq # 本地开发使用 RocketMQ
type: redis # 本地开发使用 RocketMQ
--- #################### 日志相关配置 ####################

View File

@@ -37,14 +37,14 @@ yudao:
# 针对引入的 EMQX 组件的配置
# ====================================
emqx:
enabled: true
enabled: false
mqtt-ssl: false
mqtt-topics:
- "/sys/#" # 系统主题
# 消息总线配置
message-bus:
type: rocketmq # 消息总线的类型
type: redis # 消息总线的类型
--- #################### 日志相关配置 ####################

View File

@@ -313,6 +313,6 @@ yudao:
customer: E77DF18BE109F454A5CD319E44BF5177
iot:
message-bus:
type: rocketmq # 消息总线的类型
type: redis # 消息总线的类型
debug: false