From ea1f0cb462520cc0a3bd1633e02672a6c0ebac97 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Thu, 26 Jun 2025 09:58:34 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90IoT=20=E7=89=A9=E8=81=94?= =?UTF-8?q?=E7=BD=91=E3=80=91=E5=AE=9E=E7=8E=B0=E2=80=9C=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=B5=81=E8=BD=AC=E2=80=9D=E5=8A=9F=E8=83=BD=E7=9A=84=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=EF=BC=8880%=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDeviceMessageIdentifierEnum.java | 1 + .../rule/IotRuleSceneActionTypeEnum.java | 4 +- .../iot/dal/mysql/rule/IotDataRuleMapper.java | 4 + .../iot/dal/redis/RedisKeyConstants.java | 16 +++ .../rule/IotDataRuleMessageHandler.java | 50 ++++++++ .../service/rule/data/IotDataRuleService.java | 14 +- .../rule/data/IotDataRuleServiceImpl.java | 120 +++++++++++++++++- .../service/rule/data/IotDataSinkService.java | 8 ++ .../rule/data/IotDataSinkServiceImpl.java | 12 +- .../data/IotRuleSceneDataBridgeAction.java | 61 --------- .../data/action/IotDataBridgeExecute.java | 45 ------- .../rule/data/action/IotDataRuleAction.java | 28 ++++ ...e.java => IotDataRuleCacheableAction.java} | 29 +++-- ...xecute.java => IotHttpDataSinkAction.java} | 13 +- ...ecute.java => IotKafkaDataRuleAction.java} | 10 +- ...te.java => IotRabbitMQDataRuleAction.java} | 18 +-- ...ute.java => IotRedisStreamRuleAction.java} | 8 +- ...te.java => IotRocketMQDataRuleAction.java} | 8 +- .../rule/scene/IotRuleSceneServiceImpl.java | 5 - .../databridge/IotDataBridgeExecuteTest.java | 14 +- 20 files changed, 296 insertions(+), 172 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotRuleSceneDataBridgeAction.java delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataBridgeExecute.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleAction.java rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{AbstractCacheableDataBridgeExecute.java => IotDataRuleCacheableAction.java} (83%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{IotHttpDataBridgeExecute.java => IotHttpDataSinkAction.java} (88%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{IotKafkaMQDataBridgeExecute.java => IotKafkaDataRuleAction.java} (87%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{IotRabbitMQDataBridgeExecute.java => IotRabbitMQDataRuleAction.java} (84%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{IotRedisStreamDataBridgeExecute.java => IotRedisStreamRuleAction.java} (90%) rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/{IotRocketMQDataBridgeExecute.java => IotRocketMQDataRuleAction.java} (88%) diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java index a06b43ce96..e9dbe2f658 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java @@ -7,6 +7,7 @@ import lombok.RequiredArgsConstructor; /** * IoT 设备消息标识符枚举 */ +@Deprecated @Getter @RequiredArgsConstructor public enum IotDeviceMessageIdentifierEnum { diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRuleSceneActionTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRuleSceneActionTypeEnum.java index 6e6843b093..5251852312 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRuleSceneActionTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotRuleSceneActionTypeEnum.java @@ -40,9 +40,7 @@ public enum IotRuleSceneActionTypeEnum implements ArrayValuable { @Deprecated ALERT(2), // 告警执行 - - @Deprecated - DATA_BRIDGE(3); // 桥接执行 + ; private final Integer type; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java index d59023290a..7c0c17d3bc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java @@ -31,4 +31,8 @@ public interface IotDataRuleMapper extends BaseMapperX { .apply(MyBatisUtils.findInSet("sink_ids", sinkId))); } + default List selectListByStatus(Integer status) { + return selectList(IotDataRuleDO::getStatus, status); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java index 5c4b7429f0..1187677e54 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java @@ -60,4 +60,20 @@ public interface RedisKeyConstants { */ String THING_MODEL_LIST = "iot:thing_model_list"; + /** + * 数据流转规则的数据缓存,使用 Spring Cache 操作 + * + * KEY 格式:data_rule_list_${deviceId}_${method}_${identifier} + * VALUE 数据类型:String 数组(JSON),即 {@link cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO} 列表 + */ + String DATA_RULE_LIST = "iot:data_rule_list"; + + /** + * 数据目的的数据缓存,使用 Spring Cache 操作 + * + * KEY 格式:data_sink_${id} + * VALUE 数据类型:String(JSON),即 {@link cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO} + */ + String DATA_SINK = "iot:data_sink"; + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java new file mode 100644 index 0000000000..c2b82262c7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java @@ -0,0 +1,50 @@ +package cn.iocoder.yudao.module.iot.mq.consumer.rule; + +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.service.rule.data.IotDataRuleService; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +// TODO @puhui999:后面重构哈 + +/** + * 针对 {@link IotDeviceMessage} 的消费者,处理数据流转 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotDataRuleMessageHandler implements IotMessageSubscriber { + + @Resource + private IotDataRuleService dataRuleService; + + @Resource + private IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC; + } + + @Override + public String getGroup() { + return "iot_data_rule_consumer"; + } + + @Override + public void onMessage(IotDeviceMessage message) { + TenantUtils.execute(message.getTenantId(), () -> dataRuleService.executeDataRule(message)); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleService.java index 42fdf3099b..1e0a813305 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleService.java @@ -1,13 +1,14 @@ package cn.iocoder.yudao.module.iot.service.rule.data; -import java.util.List; - import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO; import jakarta.validation.Valid; +import java.util.List; + /** * IoT 数据流转规则 Service 接口 * @@ -59,6 +60,13 @@ public interface IotDataRuleService { * @param sinkId 数据目的编号 * @return 是否被使用 */ - List getDataRuleBySinkId(Long sinkId); + List getDataRuleListBySinkId(Long sinkId); + + /** + * 执行数据流转规则 + * + * @param message 消息 + */ + void executeDataRule(IotDeviceMessage message); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java index 65c7a393ea..d7370c0a64 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java @@ -3,17 +3,28 @@ package cn.iocoder.yudao.module.iot.service.rule.data; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.framework.common.util.object.ObjectUtils; +import cn.iocoder.yudao.framework.common.util.spring.SpringUtils; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataRuleMapper; +import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.product.IotProductService; +import cn.iocoder.yudao.module.iot.service.rule.data.action.IotDataRuleAction; import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; @@ -30,6 +41,7 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT */ @Service @Validated +@Slf4j public class IotDataRuleServiceImpl implements IotDataRuleService { @Resource @@ -44,7 +56,11 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { @Resource private IotDataSinkService dataSinkService; + @Resource + private List dataRuleActions; + @Override + @CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true) public Long createDataRule(IotDataRuleSaveReqVO createReqVO) { // 校验数据源配置和数据目的 validateDataRuleConfig(createReqVO); @@ -55,6 +71,7 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { } @Override + @CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true) public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) { // 校验存在 validateDataRuleExists(updateReqVO.getId()); @@ -67,6 +84,7 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { } @Override + @CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true) public void deleteDataRule(Long id) { // 校验存在 validateDataRuleExists(id); @@ -116,15 +134,15 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { * @param sourceConfigs 数据源配置列表 */ private void validateThingModelsExist(List sourceConfigs) { - Map> productIdToIdentifiers = new HashMap<>(); + Map> productIdIdentifiers = new HashMap<>(); for (IotDataRuleDO.SourceConfig config : sourceConfigs) { if (StrUtil.isEmpty(config.getIdentifier())) { continue; } - productIdToIdentifiers.computeIfAbsent(config.getProductId(), + productIdIdentifiers.computeIfAbsent(config.getProductId(), productId -> new HashSet<>()).add(config.getIdentifier()); } - for (Map.Entry> entry : productIdToIdentifiers.entrySet()) { + for (Map.Entry> entry : productIdIdentifiers.entrySet()) { thingModelService.validateThingModelsExist(entry.getKey(), entry.getValue()); } } @@ -140,8 +158,102 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { } @Override - public List getDataRuleBySinkId(Long sinkId) { + public List getDataRuleListBySinkId(Long sinkId) { return dataRuleMapper.selectListBySinkId(sinkId); } + @Cacheable(value = RedisKeyConstants.DATA_RULE_LIST, + key = "#deviceId + '_' + #method + '_' + (#identifier ?: '')") + public List getDataRuleListByConditionFromCache(Long deviceId, String method, String identifier) { + // 1. 查询所有开启的数据流转规则 + List rules = dataRuleMapper.selectListByStatus(CommonStatusEnum.ENABLE.getStatus()); + // 2. 内存里过滤匹配的规则 + List matchedRules = new ArrayList<>(); + for (IotDataRuleDO rule : rules) { + IotDataRuleDO.SourceConfig found = CollUtil.findOne(rule.getSourceConfigs(), + config -> ObjectUtils.equalsAny(config.getDeviceId(), deviceId, IotDeviceDO.DEVICE_ID_ALL) + && (StrUtil.isNotEmpty(config.getMethod()) || ObjUtil.equal(config.getMethod(), method)) + && (StrUtil.isEmpty(config.getIdentifier()) || ObjUtil.equal(config.getIdentifier(), identifier))); + if (found != null) { + matchedRules.add(new IotDataRuleDO().setId(rule.getId()).setSinkIds(rule.getSinkIds())); + } + } + return matchedRules; + } + + @Override + public void executeDataRule(IotDeviceMessage message) { + try { + // 1. 获取匹配的数据流转规则 + Long deviceId = message.getDeviceId(); + String method = message.getMethod(); + String identifier = IotDeviceMessageUtils.getIdentifier(message); + List rules = getSelf().getDataRuleListByConditionFromCache(deviceId, method, identifier); + if (CollUtil.isEmpty(rules)) { + log.debug("[executeDataRule][设备({}) 方法({}) 标识符({}) 没有匹配的数据流转规则]", + deviceId, method, identifier); + return; + } + log.info("[executeDataRule][设备({}) 方法({}) 标识符({}) 匹配到 {} 条数据流转规则]", + deviceId, method, identifier, rules.size()); + + // 2. 遍历规则,执行数据流转 + rules.forEach(rule -> executeDataRule(message, rule)); + } catch (Exception e) { + log.error("[executeDataRule][消息({}) 执行数据流转规则异常]", message, e); + } + } + + /** + * 为指定规则的所有数据目的执行数据流转 + * + * @param message 设备消息 + * @param rule 数据流转规则 + */ + private void executeDataRule(IotDeviceMessage message, IotDataRuleDO rule) { + rule.getSinkIds().forEach(sinkId -> { + try { + // 获取数据目的配置 + IotDataSinkDO dataSink = dataSinkService.getDataSinkFromCache(sinkId); + if (dataSink == null) { + log.error("[executeDataRule][规则({}) 对应的数据目的({}) 不存在]", rule.getId(), sinkId); + return; + } + if (CommonStatusEnum.isDisable(dataSink.getStatus())) { + log.info("[executeDataRule][规则({}) 对应的数据目的({}) 状态为禁用]", rule.getId(), sinkId); + return; + } + + // 执行数据桥接操作 + executeDataRuleAction(message, dataSink); + } catch (Exception e) { + log.error("[executeDataRule][规则({}) 数据目的({}) 执行异常]", rule.getId(), sinkId, e); + } + }); + } + + /** + * 执行数据流转操作 + * + * @param message 设备消息 + * @param dataSink 数据目的 + */ + private void executeDataRuleAction(IotDeviceMessage message, IotDataSinkDO dataSink) { + dataRuleActions.forEach(action -> { + if (ObjUtil.notEqual(action.getType(), dataSink.getType())) { + return; + } + try { + action.execute(message, dataSink); + log.info("[executeDataRuleAction][消息({}) 数据目的({}) 执行成功]", message.getId(), dataSink.getId()); + } catch (Exception e) { + log.error("[executeDataRuleAction][消息({}) 数据目的({}) 执行异常]", message.getId(), dataSink.getId(), e); + } + }); + } + + private IotDataRuleServiceImpl getSelf() { + return SpringUtils.getBean(IotDataRuleServiceImpl.class); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkService.java index 307163a8ec..d0e2a5282e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkService.java @@ -46,6 +46,14 @@ public interface IotDataSinkService { */ IotDataSinkDO getDataSink(Long id); + /** + * 从缓存中获得数据流转目的 + * + * @param id 编号 + * @return 数据流转目的 + */ + IotDataSinkDO getDataSinkFromCache(Long id); + /** * 获得数据流转目的分页 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java index 2b964c9952..9977afba22 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java @@ -7,7 +7,9 @@ import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSin import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSinkSaveReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataSinkMapper; +import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import jakarta.annotation.Resource; +import org.springframework.cache.annotation.Cacheable; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; @@ -16,8 +18,8 @@ import java.util.Collection; import java.util.List; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; -import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS; import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_DELETE_FAIL_USED_BY_RULE; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS; /** * IoT 数据流转目的 Service 实现类 @@ -56,7 +58,7 @@ public class IotDataSinkServiceImpl implements IotDataSinkService { // 校验存在 validateDataBridgeExists(id); // 校验是否被数据流转规则使用 - if (CollUtil.isNotEmpty(dataRuleService.getDataRuleBySinkId(id))) { + if (CollUtil.isNotEmpty(dataRuleService.getDataRuleListBySinkId(id))) { throw exception(DATA_SINK_DELETE_FAIL_USED_BY_RULE); } // 删除 @@ -74,6 +76,12 @@ public class IotDataSinkServiceImpl implements IotDataSinkService { return dataSinkMapper.selectById(id); } + @Override + @Cacheable(value = RedisKeyConstants.DATA_SINK, key = "#id") + public IotDataSinkDO getDataSinkFromCache(Long id) { + return dataSinkMapper.selectById(id); + } + @Override public PageResult getDataSinkPage(IotDataSinkPageReqVO pageReqVO) { return dataSinkMapper.selectPage(pageReqVO); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotRuleSceneDataBridgeAction.java deleted file mode 100644 index 08b76be5df..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotRuleSceneDataBridgeAction.java +++ /dev/null @@ -1,61 +0,0 @@ -package cn.iocoder.yudao.module.iot.service.rule.data; - -import cn.hutool.core.lang.Assert; -import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; -import cn.iocoder.yudao.module.iot.service.rule.data.action.IotDataBridgeExecute; -import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotRuleSceneAction; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.List; - -/** - * IoT 数据流转目的的 {@link IotRuleSceneAction} 实现类 - * - * @author 芋道源码 - */ -@Deprecated -@Component -@Slf4j -public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { - - @Resource - private IotDataSinkService dataBridgeService; - @Resource - private List> dataBridgeExecutes; - - @Override - public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception { - // 1.1 如果消息为空,直接返回 - if (message == null) { - return; - } - // 1.2 获得数据流转目的 - Assert.notNull(config.getDataBridgeId(), "数据流转目的编号不能为空"); - IotDataSinkDO dataBridge = dataBridgeService.getDataSink(config.getDataBridgeId()); - if (dataBridge == null || dataBridge.getConfig() == null) { - log.error("[execute][message({}) config({}) 对应的数据流转目的不存在]", message, config); - return; - } - if (CommonStatusEnum.isDisable(dataBridge.getStatus())) { - log.info("[execute][message({}) config({}) 对应的数据流转目的({}) 状态为禁用]", message, config, dataBridge); - return; - } - - // 2. 执行数据桥接操作 - for (IotDataBridgeExecute execute : dataBridgeExecutes) { - execute.execute(message, dataBridge); - } - } - - @Override - public IotRuleSceneActionTypeEnum getType() { - return IotRuleSceneActionTypeEnum.DATA_BRIDGE; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataBridgeExecute.java deleted file mode 100644 index 48e7f47cc3..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataBridgeExecute.java +++ /dev/null @@ -1,45 +0,0 @@ -package cn.iocoder.yudao.module.iot.service.rule.data.action; - -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; - -/** - * IoT 数据流转目的的执行器 execute 接口 - * - * @author HUIHUI - */ -public interface IotDataBridgeExecute { - - /** - * 获取数据流转目的类型 - * - * @return 数据流转目的类型 - */ - Integer getType(); - - /** - * 执行数据流转目的操作 - * - * @param message 设备消息 - * @param dataBridge 数据流转目的 - */ - @SuppressWarnings({"unchecked"}) - default void execute(IotDeviceMessage message, IotDataSinkDO dataBridge) throws Exception { - // 1.1 校验数据流转目的类型 - if (!getType().equals(dataBridge.getType())) { - return; - } - - // 1.2 执行对应的数据流转目的发送消息 - execute0(message, (Config) dataBridge.getConfig()); - } - - /** - * 【真正】执行数据流转目的操作 - * - * @param message 设备消息 - * @param config 桥梁配置 - */ - void execute0(IotDeviceMessage message, Config config) throws Exception; - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleAction.java new file mode 100644 index 0000000000..8e6458ba86 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleAction.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; + +/** + * IoT 数据流转目的的执行器 action 接口 + * + * @author HUIHUI + */ +public interface IotDataRuleAction { + + /** + * 获取数据流转目的类型 + * + * @return 数据流转目的类型 + */ + Integer getType(); + + /** + * 执行数据流转目的操作 + * + * @param message 设备消息 + * @param dataSink 数据流转目的 + */ + void execute(IotDeviceMessage message, IotDataSinkDO dataSink); + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java similarity index 83% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/AbstractCacheableDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java index 584285e53c..4319469082 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; +import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ObjUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; @@ -17,14 +18,14 @@ import java.time.Duration; // TODO @芋艿:websocket /** - * 带缓存功能的数据流转目的执行器抽象类 + * 可缓存的 {@link IotDataRuleAction} 抽象实现 * * 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。 * * 主要特点: * - 基于Guava Cache实现高效的生产者实例缓存管理 * - 自动处理生产者的生命周期(创建、获取、关闭) - * - 支持30分钟未访问自动过期清理机制 + * - 支持 30 分钟未访问自动过期清理机制 * - 异常处理与日志记录,便于问题排查 * * 子类需要实现: @@ -36,7 +37,7 @@ import java.time.Duration; * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class IotDataRuleCacheableAction implements IotDataRuleAction { /** * Producer 缓存 @@ -45,10 +46,6 @@ public abstract class AbstractCacheableDataBridgeExecute imple .expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期 .removalListener((RemovalListener) notification -> { Producer producer = notification.getValue(); - if (producer == null) { - return; - } - try { closeProducer(producer); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); @@ -100,15 +97,21 @@ public abstract class AbstractCacheableDataBridgeExecute imple @Override @SuppressWarnings({"unchecked"}) - public void execute(IotDeviceMessage message, IotDataSinkDO dataBridge) { - if (ObjUtil.notEqual(dataBridge.getType(), getType())) { - return; - } + public void execute(IotDeviceMessage message, IotDataSinkDO dataSink) { + Assert.isTrue(ObjUtil.equal(dataSink.getType(), getType()), "类型({})不匹配", dataSink.getType()); try { - execute0(message, (Config) dataBridge.getConfig()); + execute(message, (Config) dataSink.getConfig()); } catch (Exception e) { - log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e); + log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataSink.getConfig(), message, e); } } + /** + * 执行数据流转 + * + * @param message 设备消息 + * @param config 配置信息 + */ + protected abstract void execute(IotDeviceMessage message, Config config) throws Exception; + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataSinkAction.java similarity index 88% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataSinkAction.java index da02677aae..c23e346dbf 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotHttpDataSinkAction.java @@ -1,10 +1,12 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.framework.common.util.http.HttpUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkHttpConfig; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkHttpConfig; import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -17,13 +19,13 @@ import java.util.HashMap; import java.util.Map; /** - * Http 的 {@link IotDataBridgeExecute} 实现类 + * HTTP 的 {@link IotDataRuleAction} 实现类 * * @author HUIHUI */ @Component @Slf4j -public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { +public class IotHttpDataSinkAction implements IotDataRuleAction { @Resource private RestTemplate restTemplate; @@ -34,8 +36,9 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute requestEntity = null; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaDataRuleAction.java similarity index 87% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaMQDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaDataRuleAction.java index ec7eade63f..5bbbe07b4b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotKafkaDataRuleAction.java @@ -17,17 +17,17 @@ import java.util.Map; import java.util.concurrent.TimeUnit; /** - * Kafka 的 {@link IotDataBridgeExecute} 实现类 + * Kafka 的 {@link IotDataRuleAction} 实现类 * * @author HUIHUI */ @ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate") @Component @Slf4j -public class IotKafkaMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute> { +public class IotKafkaDataRuleAction extends + IotDataRuleCacheableAction> { - private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间 + private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10); @Override public Integer getType() { @@ -35,7 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception { + public void execute(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception { // 1. 获取或创建 KafkaTemplate KafkaTemplate kafkaTemplate = getProducer(config); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataRuleAction.java similarity index 84% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataRuleAction.java index d8abaa7602..89d3500c6a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRabbitMQDataRuleAction.java @@ -13,16 +13,15 @@ import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** - * RabbitMQ 的 {@link IotDataBridgeExecute} 实现类 + * RabbitMQ 的 {@link IotDataRuleAction} 实现类 * * @author HUIHUI */ @ConditionalOnClass(name = "com.rabbitmq.client.Channel") @Component @Slf4j -public class IotRabbitMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute { - +public class IotRabbitMQDataRuleAction extends + IotDataRuleCacheableAction { @Override public Integer getType() { @@ -30,16 +29,15 @@ public class IotRabbitMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception { - // 1. 获取或创建 Channel + public void execute(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception { + // 1.1 获取或创建 Channel Channel channel = getProducer(config); - - // 2.1 声明交换机、队列和绑定关系 + // 1.2 声明交换机、队列和绑定关系 channel.exchangeDeclare(config.getExchange(), "direct", true); channel.queueDeclare(config.getQueue(), true, false, false, null); channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); - // 2.2 发送消息 + // 2. 发送消息 channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, message.toString().getBytes(StandardCharsets.UTF_8)); log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); @@ -55,10 +53,8 @@ public class IotRabbitMQDataBridgeExecute extends factory.setVirtualHost(config.getVirtualHost()); factory.setUsername(config.getUsername()); factory.setPassword(config.getPassword()); - // 2. 创建连接 Connection connection = factory.newConnection(); - // 3. 创建信道 return connection.createChannel(); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java similarity index 90% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java index be3370461a..9870c7d464 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRedisStreamRuleAction.java @@ -18,14 +18,14 @@ import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component; /** - * Redis Stream 的 {@link IotDataBridgeExecute} 实现类 + * Redis Stream 的 {@link IotDataRuleAction} 实现类 * * @author HUIHUI */ @Component @Slf4j -public class IotRedisStreamDataBridgeExecute extends - AbstractCacheableDataBridgeExecute> { +public class IotRedisStreamRuleAction extends + IotDataRuleCacheableAction> { @Override public Integer getType() { @@ -33,7 +33,7 @@ public class IotRedisStreamDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception { + public void execute(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception { // 1. 获取 RedisTemplate RedisTemplate redisTemplate = getProducer(config); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataRuleAction.java similarity index 88% rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataBridgeExecute.java rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataRuleAction.java index 6a8d66842b..1a212ec5ea 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotRocketMQDataRuleAction.java @@ -13,15 +13,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Component; /** - * RocketMQ 的 {@link IotDataBridgeExecute} 实现类 + * RocketMQ 的 {@link IotDataRuleAction} 实现类 * * @author HUIHUI */ @ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer") @Component @Slf4j -public class IotRocketMQDataBridgeExecute extends - AbstractCacheableDataBridgeExecute { +public class IotRocketMQDataRuleAction extends + IotDataRuleCacheableAction { @Override public Integer getType() { @@ -29,7 +29,7 @@ public class IotRocketMQDataBridgeExecute extends } @Override - public void execute0(IotDeviceMessage message, IotDataSinkRocketMQConfig config) throws Exception { + public void execute(IotDeviceMessage message, IotDataSinkRocketMQConfig config) throws Exception { // 1. 获取或创建 Producer DefaultMQProducer producer = getProducer(config); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotRuleSceneServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotRuleSceneServiceImpl.java index b422e0d509..fc77180fde 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotRuleSceneServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotRuleSceneServiceImpl.java @@ -214,11 +214,6 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService { .build()); action01.setDeviceControl(actionDeviceControl01); // ruleScene01.getActions().add(action01); // TODO 芋艿:先不测试了 - // 数据桥接(http) - IotRuleSceneDO.ActionConfig action02 = new IotRuleSceneDO.ActionConfig(); - action02.setType(IotRuleSceneActionTypeEnum.DATA_BRIDGE.getType()); - action02.setDataBridgeId(1L); - ruleScene01.getActions().add(action02); return ListUtil.toList(ruleScene01); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java index 52599c455f..d8cf9ee66e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java @@ -23,7 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; /** - * {@link IotDataBridgeExecute} 实现类的单元测试 + * {@link IotDataRuleAction} 实现类的单元测试 * * @author HUIHUI */ @@ -37,7 +37,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { private RestTemplate restTemplate; @InjectMocks - private IotHttpDataBridgeExecute httpDataBridgeExecute; + private IotHttpDataSinkAction httpDataBridgeExecute; @BeforeEach public void setUp() { @@ -51,7 +51,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { @Test public void testKafkaMQDataBridge() throws Exception { // 1. 创建执行器实例 - IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute(); + IotKafkaDataRuleAction action = new IotKafkaDataRuleAction(); // 2. 创建配置 IotDataSinkKafkaConfig config = new IotDataSinkKafkaConfig() @@ -68,7 +68,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { @Test public void testRabbitMQDataBridge() throws Exception { // 1. 创建执行器实例 - IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); + IotRabbitMQDataRuleAction action = new IotRabbitMQDataRuleAction(); // 2. 创建配置 IotDataSinkRabbitMQConfig config = new IotDataSinkRabbitMQConfig() @@ -88,7 +88,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { @Test public void testRedisStreamDataBridge() throws Exception { // 1. 创建执行器实例 - IotRedisStreamDataBridgeExecute action = new IotRedisStreamDataBridgeExecute(); + IotRedisStreamRuleAction action = new IotRedisStreamRuleAction(); // 2. 创建配置 IotDataSinkRedisStreamConfig config = new IotDataSinkRedisStreamConfig() @@ -105,7 +105,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { @Test public void testRocketMQDataBridge() throws Exception { // 1. 创建执行器实例 - IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute(); + IotRocketMQDataRuleAction action = new IotRocketMQDataRuleAction(); // 2. 创建配置 IotDataSinkRocketMQConfig config = new IotDataSinkRocketMQConfig() @@ -142,7 +142,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest { * @param type MQ 类型 * @throws Exception 如果执行过程中发生异常 */ - private void executeAndVerifyCache(IotDataBridgeExecute action, IotAbstractDataSinkConfig config, String type) + private void executeAndVerifyCache(IotDataRuleAction action, IotAbstractDataSinkConfig config, String type) throws Exception { log.info("[test{}DataBridge][第一次执行,应该会创建新的 producer]", type); action.execute(message, new IotDataSinkDO().setType(action.getType()).setConfig(config));