diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotAbstractDataSinkConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotAbstractDataSinkConfig.java
index 4d08d43410..68a8fd699b 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotAbstractDataSinkConfig.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotAbstractDataSinkConfig.java
@@ -18,7 +18,7 @@ import lombok.Data;
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataSinkHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
- @JsonSubTypes.Type(value = IotDataSinkRedisStreamConfig.class, name = "21"),
+ @JsonSubTypes.Type(value = IotDataSinkRedisConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataSinkRabbitMQConfig.class, name = "31"),
@JsonSubTypes.Type(value = IotDataSinkKafkaConfig.class, name = "32"),
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisConfig.java
new file mode 100644
index 0000000000..07460ac368
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisConfig.java
@@ -0,0 +1,64 @@
+package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
+
+import cn.iocoder.yudao.framework.common.validation.InEnum;
+import cn.iocoder.yudao.module.iot.enums.rule.IotRedisDataStructureEnum;
+import lombok.Data;
+
+/**
+ * IoT Redis 配置 {@link IotAbstractDataSinkConfig} 实现类
+ *
+ * @author HUIHUI
+ */
+@Data
+public class IotDataSinkRedisConfig extends IotAbstractDataSinkConfig {
+
+ /**
+ * Redis 服务器地址
+ */
+ private String host;
+ /**
+ * 端口
+ */
+ private Integer port;
+ /**
+ * 密码
+ */
+ private String password;
+ /**
+ * 数据库索引
+ */
+ private Integer database;
+
+ /**
+ * Redis 数据结构类型
+ *
+ * 枚举 {@link IotRedisDataStructureEnum}
+ */
+ @InEnum(IotRedisDataStructureEnum.class)
+ private Integer dataStructure;
+
+ /**
+ * 主题/键名
+ *
+ * 对于不同的数据结构:
+ * - Stream: 流的键名
+ * - Hash: Hash 的键名
+ * - List: 列表的键名
+ * - Set: 集合的键名
+ * - ZSet: 有序集合的键名
+ * - String: 字符串的键名
+ */
+ private String topic;
+
+ /**
+ * Hash 字段名(仅当 dataStructure 为 HASH 时使用)
+ */
+ private String hashField;
+
+ /**
+ * ZSet 分数字段(仅当 dataStructure 为 ZSET 时使用)
+ * 指定消息中哪个字段作为分数,如果不指定则使用当前时间戳
+ */
+ private String scoreField;
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisStreamConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisStreamConfig.java
deleted file mode 100644
index 4df0ad7c38..0000000000
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkRedisStreamConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
-
-import lombok.Data;
-
-/**
- * IoT Redis Stream 配置 {@link IotAbstractDataSinkConfig} 实现类
- *
- * @author HUIHUI
- */
-@Data
-public class IotDataSinkRedisStreamConfig extends IotAbstractDataSinkConfig {
-
- /**
- * Redis 服务器地址
- */
- private String host;
- /**
- * 端口
- */
- private Integer port;
- /**
- * 密码
- */
- private String password;
- /**
- * 数据库索引
- */
- private Integer database;
-
- /**
- * 主题
- */
- private String topic;
-}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java
index 33b3558775..45a557db61 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java
@@ -22,7 +22,7 @@ public enum IotDataSinkTypeEnum implements ArrayValuable {
MQTT(10, "MQTT"), // TODO 待实现;
DATABASE(20, "Database"), // TODO @puhui999:待实现;可以简单点,对应的表名是什么,字段先固定了。
- REDIS_STREAM(21, "Redis Stream"), // TODO @puhui999:改成 Redis;然后枚举不同的数据结构?这样,枚举就可以是 Redis 了
+ REDIS(21, "Redis"),
ROCKETMQ(30, "RocketMQ"),
RABBITMQ(31, "RabbitMQ"),
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRedisDataStructureEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRedisDataStructureEnum.java
new file mode 100644
index 0000000000..4195b08439
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRedisDataStructureEnum.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.module.iot.enums.rule;
+
+import cn.iocoder.yudao.framework.common.core.ArrayValuable;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Arrays;
+
+/**
+ * IoT Redis 数据结构类型枚举
+ *
+ * @author HUIHUI
+ */
+@RequiredArgsConstructor
+@Getter
+public enum IotRedisDataStructureEnum implements ArrayValuable {
+
+ STREAM(1, "Stream"),
+ HASH(2, "Hash"),
+ LIST(3, "List"),
+ SET(4, "Set"),
+ ZSET(5, "ZSet"),
+ STRING(6, "String");
+
+ private final Integer type;
+
+ private final String name;
+
+ public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotRedisDataStructureEnum::getType).toArray(Integer[]::new);
+
+ @Override
+ public Integer[] array() {
+ return ARRAYS;
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisRuleAction.java
new file mode 100644
index 0000000000..51abffee3b
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisRuleAction.java
@@ -0,0 +1,182 @@
+package cn.iocoder.yudao.module.iot.service.rule.data.action;
+
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisConfig;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
+import cn.iocoder.yudao.module.iot.enums.rule.IotRedisDataStructureEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+import org.redisson.config.SingleServerConfig;
+import org.redisson.spring.data.connection.RedissonConnectionFactory;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * Redis 的 {@link IotDataRuleAction} 实现类
+ * 支持多种 Redis 数据结构:Stream、Hash、List、Set、ZSet、String
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRedisRuleAction extends
+ IotDataRuleCacheableAction> {
+
+ @Override
+ public Integer getType() {
+ return IotDataSinkTypeEnum.REDIS.getType();
+ }
+
+ @Override
+ public void execute(IotDeviceMessage message, IotDataSinkRedisConfig config) throws Exception {
+ // 1. 获取 RedisTemplate
+ RedisTemplate redisTemplate = getProducer(config);
+
+ // 2. 根据数据结构类型执行不同的操作
+ String messageJson = JsonUtils.toJsonString(message);
+ IotRedisDataStructureEnum dataStructure = getDataStructureByType(config.getDataStructure());
+
+ switch (dataStructure) {
+ case STREAM:
+ executeStream(redisTemplate, config, messageJson);
+ break;
+ case HASH:
+ executeHash(redisTemplate, config, message, messageJson);
+ break;
+ case LIST:
+ executeList(redisTemplate, config, messageJson);
+ break;
+ case SET:
+ executeSet(redisTemplate, config, messageJson);
+ break;
+ case ZSET:
+ executeZSet(redisTemplate, config, message, messageJson);
+ break;
+ case STRING:
+ executeString(redisTemplate, config, messageJson);
+ break;
+ default:
+ throw new IllegalArgumentException("不支持的 Redis 数据结构类型: " + dataStructure);
+ }
+
+ log.info("[execute][消息发送成功] dataStructure: {}, config: {}", dataStructure.getName(), config);
+ }
+
+ /**
+ * 执行 Stream 操作
+ */
+ private void executeStream(RedisTemplate redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
+ ObjectRecord record = StreamRecords.newRecord()
+ .ofObject(messageJson).withStreamKey(config.getTopic());
+ redisTemplate.opsForStream().add(record);
+ }
+
+ /**
+ * 执行 Hash 操作
+ */
+ private void executeHash(RedisTemplate redisTemplate, IotDataSinkRedisConfig config,
+ IotDeviceMessage message, String messageJson) {
+ String hashField = StrUtil.isNotBlank(config.getHashField()) ?
+ config.getHashField() : String.valueOf(message.getDeviceId());
+ redisTemplate.opsForHash().put(config.getTopic(), hashField, messageJson);
+ }
+
+ /**
+ * 执行 List 操作
+ */
+ private void executeList(RedisTemplate redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
+ redisTemplate.opsForList().rightPush(config.getTopic(), messageJson);
+ }
+
+ /**
+ * 执行 Set 操作
+ */
+ private void executeSet(RedisTemplate redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
+ redisTemplate.opsForSet().add(config.getTopic(), messageJson);
+ }
+
+ /**
+ * 执行 ZSet 操作
+ */
+ private void executeZSet(RedisTemplate redisTemplate, IotDataSinkRedisConfig config,
+ IotDeviceMessage message, String messageJson) {
+ double score;
+ if (StrUtil.isNotBlank(config.getScoreField())) {
+ // 尝试从消息中获取分数字段
+ try {
+ Map messageMap = JsonUtils.parseObject(messageJson, Map.class);
+ Object scoreValue = messageMap.get(config.getScoreField());
+ score = scoreValue instanceof Number ? ((Number) scoreValue).doubleValue() : System.currentTimeMillis();
+ } catch (Exception e) {
+ score = System.currentTimeMillis();
+ }
+ } else {
+ // 使用当前时间戳作为分数
+ score = System.currentTimeMillis();
+ }
+ redisTemplate.opsForZSet().add(config.getTopic(), messageJson, score);
+ }
+
+ /**
+ * 执行 String 操作
+ */
+ private void executeString(RedisTemplate redisTemplate, IotDataSinkRedisConfig config, String messageJson) {
+ redisTemplate.opsForValue().set(config.getTopic(), messageJson);
+ }
+
+ @Override
+ protected RedisTemplate initProducer(IotDataSinkRedisConfig config) {
+ // 1.1 创建 Redisson 配置
+ Config redissonConfig = new Config();
+ SingleServerConfig serverConfig = redissonConfig.useSingleServer()
+ .setAddress("redis://" + config.getHost() + ":" + config.getPort())
+ .setDatabase(config.getDatabase());
+ // 1.2 设置密码(如果有)
+ if (StrUtil.isNotBlank(config.getPassword())) {
+ serverConfig.setPassword(config.getPassword());
+ }
+
+ // 2.1 创建 RedisTemplate 并配置
+ RedissonClient redisson = Redisson.create(redissonConfig);
+ RedisTemplate template = new RedisTemplate<>();
+ template.setConnectionFactory(new RedissonConnectionFactory(redisson));
+ // 2.2 设置序列化器
+ template.setKeySerializer(RedisSerializer.string());
+ template.setHashKeySerializer(RedisSerializer.string());
+ template.setValueSerializer(RedisSerializer.json());
+ template.setHashValueSerializer(RedisSerializer.json());
+ template.afterPropertiesSet();
+ return template;
+ }
+
+ @Override
+ protected void closeProducer(RedisTemplate producer) throws Exception {
+ RedisConnectionFactory factory = producer.getConnectionFactory();
+ if (factory != null) {
+ ((RedissonConnectionFactory) factory).destroy();
+ }
+ }
+
+ /**
+ * 根据类型值获取数据结构枚举
+ */
+ private IotRedisDataStructureEnum getDataStructureByType(Integer type) {
+ for (IotRedisDataStructureEnum dataStructure : IotRedisDataStructureEnum.values()) {
+ if (dataStructure.getType().equals(type)) {
+ return dataStructure;
+ }
+ }
+ throw new IllegalArgumentException("不支持的 Redis 数据结构类型: " + type);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java
deleted file mode 100644
index d3bb81c8e9..0000000000
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package cn.iocoder.yudao.module.iot.service.rule.data.action;
-
-import cn.hutool.core.util.StrUtil;
-import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisStreamConfig;
-import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
-import lombok.extern.slf4j.Slf4j;
-import org.redisson.Redisson;
-import org.redisson.api.RedissonClient;
-import org.redisson.config.Config;
-import org.redisson.config.SingleServerConfig;
-import org.redisson.spring.data.connection.RedissonConnectionFactory;
-import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.stream.ObjectRecord;
-import org.springframework.data.redis.connection.stream.StreamRecords;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.serializer.RedisSerializer;
-import org.springframework.stereotype.Component;
-
-/**
- * Redis Stream 的 {@link IotDataRuleAction} 实现类
- *
- * @author HUIHUI
- */
-@Component
-@Slf4j
-public class IotRedisStreamRuleAction extends
- IotDataRuleCacheableAction> {
-
- @Override
- public Integer getType() {
- return IotDataSinkTypeEnum.REDIS_STREAM.getType();
- }
-
- @Override
- public void execute(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception {
- // 1. 获取 RedisTemplate
- RedisTemplate redisTemplate = getProducer(config);
-
- // 2. 创建并发送 Stream 记录
- ObjectRecord record = StreamRecords.newRecord()
- .ofObject(JsonUtils.toJsonString(message)).withStreamKey(config.getTopic());
- String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
- log.info("[execute][消息发送成功] messageId: {}, config: {}", recordId, config);
- }
-
- @Override
- protected RedisTemplate initProducer(IotDataSinkRedisStreamConfig config) {
- // 1.1 创建 Redisson 配置
- Config redissonConfig = new Config();
- SingleServerConfig serverConfig = redissonConfig.useSingleServer()
- .setAddress("redis://" + config.getHost() + ":" + config.getPort())
- .setDatabase(config.getDatabase());
- // 1.2 设置密码(如果有)
- if (StrUtil.isNotBlank(config.getPassword())) {
- serverConfig.setPassword(config.getPassword());
- }
-
- // 2.1 创建 RedisTemplate 并配置
- RedissonClient redisson = Redisson.create(redissonConfig);
- RedisTemplate template = new RedisTemplate<>();
- template.setConnectionFactory(new RedissonConnectionFactory(redisson));
- // 2.2 设置序列化器
- template.setKeySerializer(RedisSerializer.string());
- template.setHashKeySerializer(RedisSerializer.string());
- template.setValueSerializer(RedisSerializer.json());
- template.setHashValueSerializer(RedisSerializer.json());
- template.afterPropertiesSet();
- return template;
- }
-
- @Override
- protected void closeProducer(RedisTemplate producer) throws Exception {
- RedisConnectionFactory factory = producer.getConnectionFactory();
- if (factory != null) {
- ((RedissonConnectionFactory) factory).destroy();
- }
- }
-
-}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java
index 5394008022..055ccb01b2 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java
@@ -85,20 +85,21 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
}
@Test
- public void testRedisStreamDataBridge() throws Exception {
+ public void testRedisDataBridge() throws Exception {
// 1. 创建执行器实例
- IotRedisStreamRuleAction action = new IotRedisStreamRuleAction();
+ IotRedisRuleAction action = new IotRedisRuleAction();
- // 2. 创建配置
- IotDataSinkRedisStreamConfig config = new IotDataSinkRedisStreamConfig()
- .setHost("127.0.0.1")
- .setPort(6379)
- .setDatabase(0)
- .setPassword("123456")
- .setTopic("test-stream");
+ // 2. 创建配置 - 测试 Stream 数据结构
+ IotDataSinkRedisConfig config = new IotDataSinkRedisConfig();
+ config.setHost("127.0.0.1");
+ config.setPort(6379);
+ config.setDatabase(0);
+ config.setPassword("123456");
+ config.setTopic("test-stream");
+ config.setDataStructure(1); // Stream 类型
// 3. 执行测试并验证缓存
- executeAndVerifyCache(action, config, "RedisStream");
+ executeAndVerifyCache(action, config, "Redis");
}
@Test