perf:【IoT 物联网】优化 IotRedisRuleAction

This commit is contained in:
puhui999
2025-08-15 17:51:26 +08:00
parent 378cf1e997
commit a328dcf172
8 changed files with 295 additions and 127 deletions

View File

@@ -18,7 +18,7 @@ import lombok.Data;
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataSinkHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataSinkRedisStreamConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRedisConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataSinkRabbitMQConfig.class, name = "31"),
@JsonSubTypes.Type(value = IotDataSinkKafkaConfig.class, name = "32"),

View File

@@ -0,0 +1,64 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotRedisDataStructureEnum;
import lombok.Data;
/**
* IoT Redis 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataSinkRedisConfig extends IotAbstractDataSinkConfig {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* Redis 数据结构类型
* <p>
* 枚举 {@link IotRedisDataStructureEnum}
*/
@InEnum(IotRedisDataStructureEnum.class)
private Integer dataStructure;
/**
* 主题/键名
* <p>
* 对于不同的数据结构:
* - Stream: 流的键名
* - Hash: Hash 的键名
* - List: 列表的键名
* - Set: 集合的键名
* - ZSet: 有序集合的键名
* - String: 字符串的键名
*/
private String topic;
/**
* Hash 字段名(仅当 dataStructure 为 HASH 时使用)
*/
private String hashField;
/**
* ZSet 分数字段(仅当 dataStructure 为 ZSET 时使用)
* 指定消息中哪个字段作为分数,如果不指定则使用当前时间戳
*/
private String scoreField;
}

View File

@@ -1,34 +0,0 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT Redis Stream 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataSinkRedisStreamConfig extends IotAbstractDataSinkConfig {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* 主题
*/
private String topic;
}

View File

@@ -22,7 +22,7 @@ public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
MQTT(10, "MQTT"), // TODO 待实现;
DATABASE(20, "Database"), // TODO @puhui999待实现可以简单点对应的表名是什么字段先固定了。
REDIS_STREAM(21, "Redis Stream"), // TODO @puhui999改成 Redis然后枚举不同的数据结构这样枚举就可以是 Redis 了
REDIS(21, "Redis"),
ROCKETMQ(30, "RocketMQ"),
RABBITMQ(31, "RabbitMQ"),

View File

@@ -0,0 +1,36 @@
package cn.iocoder.yudao.module.iot.enums.rule;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT Redis 数据结构类型枚举
*
* @author HUIHUI
*/
@RequiredArgsConstructor
@Getter
public enum IotRedisDataStructureEnum implements ArrayValuable<Integer> {
STREAM(1, "Stream"),
HASH(2, "Hash"),
LIST(3, "List"),
SET(4, "Set"),
ZSET(5, "ZSet"),
STRING(6, "String");
private final Integer type;
private final String name;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotRedisDataStructureEnum::getType).toArray(Integer[]::new);
@Override
public Integer[] array() {
return ARRAYS;
}
}

View File

@@ -0,0 +1,182 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotRedisDataStructureEnum;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Redis 的 {@link IotDataRuleAction} 实现类
* 支持多种 Redis 数据结构Stream、Hash、List、Set、ZSet、String
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotRedisRuleAction extends
IotDataRuleCacheableAction<IotDataSinkRedisConfig, RedisTemplate<String, Object>> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.REDIS.getType();
}
@Override
public void execute(IotDeviceMessage message, IotDataSinkRedisConfig config) throws Exception {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config);
// 2. 根据数据结构类型执行不同的操作
String messageJson = JsonUtils.toJsonString(message);
IotRedisDataStructureEnum dataStructure = getDataStructureByType(config.getDataStructure());
switch (dataStructure) {
case STREAM:
executeStream(redisTemplate, config, messageJson);
break;
case HASH:
executeHash(redisTemplate, config, message, messageJson);
break;
case LIST:
executeList(redisTemplate, config, messageJson);
break;
case SET:
executeSet(redisTemplate, config, messageJson);
break;
case ZSET:
executeZSet(redisTemplate, config, message, messageJson);
break;
case STRING:
executeString(redisTemplate, config, messageJson);
break;
default:
throw new IllegalArgumentException("不支持的 Redis 数据结构类型: " + dataStructure);
}
log.info("[execute][消息发送成功] dataStructure: {}, config: {}", dataStructure.getName(), config);
}
/**
* 执行 Stream 操作
*/
private void executeStream(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
ObjectRecord<String, ?> record = StreamRecords.newRecord()
.ofObject(messageJson).withStreamKey(config.getTopic());
redisTemplate.opsForStream().add(record);
}
/**
* 执行 Hash 操作
*/
private void executeHash(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config,
IotDeviceMessage message, String messageJson) {
String hashField = StrUtil.isNotBlank(config.getHashField()) ?
config.getHashField() : String.valueOf(message.getDeviceId());
redisTemplate.opsForHash().put(config.getTopic(), hashField, messageJson);
}
/**
* 执行 List 操作
*/
private void executeList(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
redisTemplate.opsForList().rightPush(config.getTopic(), messageJson);
}
/**
* 执行 Set 操作
*/
private void executeSet(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
redisTemplate.opsForSet().add(config.getTopic(), messageJson);
}
/**
* 执行 ZSet 操作
*/
private void executeZSet(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config,
IotDeviceMessage message, String messageJson) {
double score;
if (StrUtil.isNotBlank(config.getScoreField())) {
// 尝试从消息中获取分数字段
try {
Map<String, Object> messageMap = JsonUtils.parseObject(messageJson, Map.class);
Object scoreValue = messageMap.get(config.getScoreField());
score = scoreValue instanceof Number ? ((Number) scoreValue).doubleValue() : System.currentTimeMillis();
} catch (Exception e) {
score = System.currentTimeMillis();
}
} else {
// 使用当前时间戳作为分数
score = System.currentTimeMillis();
}
redisTemplate.opsForZSet().add(config.getTopic(), messageJson, score);
}
/**
* 执行 String 操作
*/
private void executeString(RedisTemplate<String, Object> redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
redisTemplate.opsForValue().set(config.getTopic(), messageJson);
}
@Override
protected RedisTemplate<String, Object> initProducer(IotDataSinkRedisConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()
.setAddress("redis://" + config.getHost() + ":" + config.getPort())
.setDatabase(config.getDatabase());
// 1.2 设置密码(如果有)
if (StrUtil.isNotBlank(config.getPassword())) {
serverConfig.setPassword(config.getPassword());
}
// 2.1 创建 RedisTemplate 并配置
RedissonClient redisson = Redisson.create(redissonConfig);
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(new RedissonConnectionFactory(redisson));
// 2.2 设置序列化器
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
template.setValueSerializer(RedisSerializer.json());
template.setHashValueSerializer(RedisSerializer.json());
template.afterPropertiesSet();
return template;
}
@Override
protected void closeProducer(RedisTemplate<String, Object> producer) throws Exception {
RedisConnectionFactory factory = producer.getConnectionFactory();
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
}
/**
* 根据类型值获取数据结构枚举
*/
private IotRedisDataStructureEnum getDataStructureByType(Integer type) {
for (IotRedisDataStructureEnum dataStructure : IotRedisDataStructureEnum.values()) {
if (dataStructure.getType().equals(type)) {
return dataStructure;
}
}
throw new IllegalArgumentException("不支持的 Redis 数据结构类型: " + type);
}
}

View File

@@ -1,81 +0,0 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisStreamConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
/**
* Redis Stream 的 {@link IotDataRuleAction} 实现类
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotRedisStreamRuleAction extends
IotDataRuleCacheableAction<IotDataSinkRedisStreamConfig, RedisTemplate<String, Object>> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.REDIS_STREAM.getType();
}
@Override
public void execute(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config);
// 2. 创建并发送 Stream 记录
ObjectRecord<String, ?> record = StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)).withStreamKey(config.getTopic());
String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
log.info("[execute][消息发送成功] messageId: {}, config: {}", recordId, config);
}
@Override
protected RedisTemplate<String, Object> initProducer(IotDataSinkRedisStreamConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()
.setAddress("redis://" + config.getHost() + ":" + config.getPort())
.setDatabase(config.getDatabase());
// 1.2 设置密码(如果有)
if (StrUtil.isNotBlank(config.getPassword())) {
serverConfig.setPassword(config.getPassword());
}
// 2.1 创建 RedisTemplate 并配置
RedissonClient redisson = Redisson.create(redissonConfig);
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(new RedissonConnectionFactory(redisson));
// 2.2 设置序列化器
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
template.setValueSerializer(RedisSerializer.json());
template.setHashValueSerializer(RedisSerializer.json());
template.afterPropertiesSet();
return template;
}
@Override
protected void closeProducer(RedisTemplate<String, Object> producer) throws Exception {
RedisConnectionFactory factory = producer.getConnectionFactory();
if (factory != null) {
((RedissonConnectionFactory) factory).destroy();
}
}
}

View File

@@ -85,20 +85,21 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
}
@Test
public void testRedisStreamDataBridge() throws Exception {
public void testRedisDataBridge() throws Exception {
// 1. 创建执行器实例
IotRedisStreamRuleAction action = new IotRedisStreamRuleAction();
IotRedisRuleAction action = new IotRedisRuleAction();
// 2. 创建配置
IotDataSinkRedisStreamConfig config = new IotDataSinkRedisStreamConfig()
.setHost("127.0.0.1")
.setPort(6379)
.setDatabase(0)
.setPassword("123456")
.setTopic("test-stream");
// 2. 创建配置 - 测试 Stream 数据结构
IotDataSinkRedisConfig config = new IotDataSinkRedisConfig();
config.setHost("127.0.0.1");
config.setPort(6379);
config.setDatabase(0);
config.setPassword("123456");
config.setTopic("test-stream");
config.setDataStructure(1); // Stream 类型
// 3. 执行测试并验证缓存
executeAndVerifyCache(action, config, "RedisStream");
executeAndVerifyCache(action, config, "Redis");
}
@Test