feat:【IoT 物联网】实现“数据流转”功能的执行(80%)
This commit is contained in:
@@ -7,6 +7,7 @@ import lombok.RequiredArgsConstructor;
|
||||
/**
|
||||
* IoT 设备消息标识符枚举
|
||||
*/
|
||||
@Deprecated
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum IotDeviceMessageIdentifierEnum {
|
||||
|
||||
@@ -40,9 +40,7 @@ public enum IotRuleSceneActionTypeEnum implements ArrayValuable<Integer> {
|
||||
|
||||
@Deprecated
|
||||
ALERT(2), // 告警执行
|
||||
|
||||
@Deprecated
|
||||
DATA_BRIDGE(3); // 桥接执行
|
||||
;
|
||||
|
||||
private final Integer type;
|
||||
|
||||
|
||||
@@ -31,4 +31,8 @@ public interface IotDataRuleMapper extends BaseMapperX<IotDataRuleDO> {
|
||||
.apply(MyBatisUtils.findInSet("sink_ids", sinkId)));
|
||||
}
|
||||
|
||||
default List<IotDataRuleDO> selectListByStatus(Integer status) {
|
||||
return selectList(IotDataRuleDO::getStatus, status);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -60,4 +60,20 @@ public interface RedisKeyConstants {
|
||||
*/
|
||||
String THING_MODEL_LIST = "iot:thing_model_list";
|
||||
|
||||
/**
|
||||
* 数据流转规则的数据缓存,使用 Spring Cache 操作
|
||||
*
|
||||
* KEY 格式:data_rule_list_${deviceId}_${method}_${identifier}
|
||||
* VALUE 数据类型:String 数组(JSON),即 {@link cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO} 列表
|
||||
*/
|
||||
String DATA_RULE_LIST = "iot:data_rule_list";
|
||||
|
||||
/**
|
||||
* 数据目的的数据缓存,使用 Spring Cache 操作
|
||||
*
|
||||
* KEY 格式:data_sink_${id}
|
||||
* VALUE 数据类型:String(JSON),即 {@link cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO}
|
||||
*/
|
||||
String DATA_SINK = "iot:data_sink";
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
package cn.iocoder.yudao.module.iot.mq.consumer.rule;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.service.rule.data.IotDataRuleService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
// TODO @puhui999:后面重构哈
|
||||
|
||||
/**
|
||||
* 针对 {@link IotDeviceMessage} 的消费者,处理数据流转
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotDataRuleMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
|
||||
|
||||
@Resource
|
||||
private IotDataRuleService dataRuleService;
|
||||
|
||||
@Resource
|
||||
private IotMessageBus messageBus;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
messageBus.register(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "iot_data_rule_consumer";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(IotDeviceMessage message) {
|
||||
TenantUtils.execute(message.getTenantId(), () -> dataRuleService.executeDataRule(message));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,13 +1,14 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
|
||||
import jakarta.validation.Valid;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* IoT 数据流转规则 Service 接口
|
||||
*
|
||||
@@ -59,6 +60,13 @@ public interface IotDataRuleService {
|
||||
* @param sinkId 数据目的编号
|
||||
* @return 是否被使用
|
||||
*/
|
||||
List<IotDataRuleDO> getDataRuleBySinkId(Long sinkId);
|
||||
List<IotDataRuleDO> getDataRuleListBySinkId(Long sinkId);
|
||||
|
||||
/**
|
||||
* 执行数据流转规则
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
void executeDataRule(IotDeviceMessage message);
|
||||
|
||||
}
|
||||
@@ -3,17 +3,28 @@ package cn.iocoder.yudao.module.iot.service.rule.data;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.spring.SpringUtils;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataRuleMapper;
|
||||
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
|
||||
import cn.iocoder.yudao.module.iot.service.rule.data.action.IotDataRuleAction;
|
||||
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cache.annotation.CacheEvict;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
@@ -30,6 +41,7 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT
|
||||
*/
|
||||
@Service
|
||||
@Validated
|
||||
@Slf4j
|
||||
public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
|
||||
@Resource
|
||||
@@ -44,7 +56,11 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
@Resource
|
||||
private IotDataSinkService dataSinkService;
|
||||
|
||||
@Resource
|
||||
private List<IotDataRuleAction> dataRuleActions;
|
||||
|
||||
@Override
|
||||
@CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true)
|
||||
public Long createDataRule(IotDataRuleSaveReqVO createReqVO) {
|
||||
// 校验数据源配置和数据目的
|
||||
validateDataRuleConfig(createReqVO);
|
||||
@@ -55,6 +71,7 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
}
|
||||
|
||||
@Override
|
||||
@CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true)
|
||||
public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) {
|
||||
// 校验存在
|
||||
validateDataRuleExists(updateReqVO.getId());
|
||||
@@ -67,6 +84,7 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
}
|
||||
|
||||
@Override
|
||||
@CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true)
|
||||
public void deleteDataRule(Long id) {
|
||||
// 校验存在
|
||||
validateDataRuleExists(id);
|
||||
@@ -116,15 +134,15 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
* @param sourceConfigs 数据源配置列表
|
||||
*/
|
||||
private void validateThingModelsExist(List<IotDataRuleDO.SourceConfig> sourceConfigs) {
|
||||
Map<Long, Set<String>> productIdToIdentifiers = new HashMap<>();
|
||||
Map<Long, Set<String>> productIdIdentifiers = new HashMap<>();
|
||||
for (IotDataRuleDO.SourceConfig config : sourceConfigs) {
|
||||
if (StrUtil.isEmpty(config.getIdentifier())) {
|
||||
continue;
|
||||
}
|
||||
productIdToIdentifiers.computeIfAbsent(config.getProductId(),
|
||||
productIdIdentifiers.computeIfAbsent(config.getProductId(),
|
||||
productId -> new HashSet<>()).add(config.getIdentifier());
|
||||
}
|
||||
for (Map.Entry<Long, Set<String>> entry : productIdToIdentifiers.entrySet()) {
|
||||
for (Map.Entry<Long, Set<String>> entry : productIdIdentifiers.entrySet()) {
|
||||
thingModelService.validateThingModelsExist(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
@@ -140,8 +158,102 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IotDataRuleDO> getDataRuleBySinkId(Long sinkId) {
|
||||
public List<IotDataRuleDO> getDataRuleListBySinkId(Long sinkId) {
|
||||
return dataRuleMapper.selectListBySinkId(sinkId);
|
||||
}
|
||||
|
||||
@Cacheable(value = RedisKeyConstants.DATA_RULE_LIST,
|
||||
key = "#deviceId + '_' + #method + '_' + (#identifier ?: '')")
|
||||
public List<IotDataRuleDO> getDataRuleListByConditionFromCache(Long deviceId, String method, String identifier) {
|
||||
// 1. 查询所有开启的数据流转规则
|
||||
List<IotDataRuleDO> rules = dataRuleMapper.selectListByStatus(CommonStatusEnum.ENABLE.getStatus());
|
||||
// 2. 内存里过滤匹配的规则
|
||||
List<IotDataRuleDO> matchedRules = new ArrayList<>();
|
||||
for (IotDataRuleDO rule : rules) {
|
||||
IotDataRuleDO.SourceConfig found = CollUtil.findOne(rule.getSourceConfigs(),
|
||||
config -> ObjectUtils.equalsAny(config.getDeviceId(), deviceId, IotDeviceDO.DEVICE_ID_ALL)
|
||||
&& (StrUtil.isNotEmpty(config.getMethod()) || ObjUtil.equal(config.getMethod(), method))
|
||||
&& (StrUtil.isEmpty(config.getIdentifier()) || ObjUtil.equal(config.getIdentifier(), identifier)));
|
||||
if (found != null) {
|
||||
matchedRules.add(new IotDataRuleDO().setId(rule.getId()).setSinkIds(rule.getSinkIds()));
|
||||
}
|
||||
}
|
||||
return matchedRules;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeDataRule(IotDeviceMessage message) {
|
||||
try {
|
||||
// 1. 获取匹配的数据流转规则
|
||||
Long deviceId = message.getDeviceId();
|
||||
String method = message.getMethod();
|
||||
String identifier = IotDeviceMessageUtils.getIdentifier(message);
|
||||
List<IotDataRuleDO> rules = getSelf().getDataRuleListByConditionFromCache(deviceId, method, identifier);
|
||||
if (CollUtil.isEmpty(rules)) {
|
||||
log.debug("[executeDataRule][设备({}) 方法({}) 标识符({}) 没有匹配的数据流转规则]",
|
||||
deviceId, method, identifier);
|
||||
return;
|
||||
}
|
||||
log.info("[executeDataRule][设备({}) 方法({}) 标识符({}) 匹配到 {} 条数据流转规则]",
|
||||
deviceId, method, identifier, rules.size());
|
||||
|
||||
// 2. 遍历规则,执行数据流转
|
||||
rules.forEach(rule -> executeDataRule(message, rule));
|
||||
} catch (Exception e) {
|
||||
log.error("[executeDataRule][消息({}) 执行数据流转规则异常]", message, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 为指定规则的所有数据目的执行数据流转
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param rule 数据流转规则
|
||||
*/
|
||||
private void executeDataRule(IotDeviceMessage message, IotDataRuleDO rule) {
|
||||
rule.getSinkIds().forEach(sinkId -> {
|
||||
try {
|
||||
// 获取数据目的配置
|
||||
IotDataSinkDO dataSink = dataSinkService.getDataSinkFromCache(sinkId);
|
||||
if (dataSink == null) {
|
||||
log.error("[executeDataRule][规则({}) 对应的数据目的({}) 不存在]", rule.getId(), sinkId);
|
||||
return;
|
||||
}
|
||||
if (CommonStatusEnum.isDisable(dataSink.getStatus())) {
|
||||
log.info("[executeDataRule][规则({}) 对应的数据目的({}) 状态为禁用]", rule.getId(), sinkId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 执行数据桥接操作
|
||||
executeDataRuleAction(message, dataSink);
|
||||
} catch (Exception e) {
|
||||
log.error("[executeDataRule][规则({}) 数据目的({}) 执行异常]", rule.getId(), sinkId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行数据流转操作
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param dataSink 数据目的
|
||||
*/
|
||||
private void executeDataRuleAction(IotDeviceMessage message, IotDataSinkDO dataSink) {
|
||||
dataRuleActions.forEach(action -> {
|
||||
if (ObjUtil.notEqual(action.getType(), dataSink.getType())) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
action.execute(message, dataSink);
|
||||
log.info("[executeDataRuleAction][消息({}) 数据目的({}) 执行成功]", message.getId(), dataSink.getId());
|
||||
} catch (Exception e) {
|
||||
log.error("[executeDataRuleAction][消息({}) 数据目的({}) 执行异常]", message.getId(), dataSink.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private IotDataRuleServiceImpl getSelf() {
|
||||
return SpringUtils.getBean(IotDataRuleServiceImpl.class);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -46,6 +46,14 @@ public interface IotDataSinkService {
|
||||
*/
|
||||
IotDataSinkDO getDataSink(Long id);
|
||||
|
||||
/**
|
||||
* 从缓存中获得数据流转目的
|
||||
*
|
||||
* @param id 编号
|
||||
* @return 数据流转目的
|
||||
*/
|
||||
IotDataSinkDO getDataSinkFromCache(Long id);
|
||||
|
||||
/**
|
||||
* 获得数据流转目的分页
|
||||
*
|
||||
|
||||
@@ -7,7 +7,9 @@ import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSin
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSinkSaveReqVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataSinkMapper;
|
||||
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
@@ -16,8 +18,8 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_DELETE_FAIL_USED_BY_RULE;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS;
|
||||
|
||||
/**
|
||||
* IoT 数据流转目的 Service 实现类
|
||||
@@ -56,7 +58,7 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
|
||||
// 校验存在
|
||||
validateDataBridgeExists(id);
|
||||
// 校验是否被数据流转规则使用
|
||||
if (CollUtil.isNotEmpty(dataRuleService.getDataRuleBySinkId(id))) {
|
||||
if (CollUtil.isNotEmpty(dataRuleService.getDataRuleListBySinkId(id))) {
|
||||
throw exception(DATA_SINK_DELETE_FAIL_USED_BY_RULE);
|
||||
}
|
||||
// 删除
|
||||
@@ -74,6 +76,12 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
|
||||
return dataSinkMapper.selectById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Cacheable(value = RedisKeyConstants.DATA_SINK, key = "#id")
|
||||
public IotDataSinkDO getDataSinkFromCache(Long id) {
|
||||
return dataSinkMapper.selectById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageResult<IotDataSinkDO> getDataSinkPage(IotDataSinkPageReqVO pageReqVO) {
|
||||
return dataSinkMapper.selectPage(pageReqVO);
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
|
||||
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.rule.data.action.IotDataBridgeExecute;
|
||||
import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotRuleSceneAction;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* IoT 数据流转目的的 {@link IotRuleSceneAction} 实现类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Deprecated
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
|
||||
|
||||
@Resource
|
||||
private IotDataSinkService dataBridgeService;
|
||||
@Resource
|
||||
private List<IotDataBridgeExecute<?>> dataBridgeExecutes;
|
||||
|
||||
@Override
|
||||
public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception {
|
||||
// 1.1 如果消息为空,直接返回
|
||||
if (message == null) {
|
||||
return;
|
||||
}
|
||||
// 1.2 获得数据流转目的
|
||||
Assert.notNull(config.getDataBridgeId(), "数据流转目的编号不能为空");
|
||||
IotDataSinkDO dataBridge = dataBridgeService.getDataSink(config.getDataBridgeId());
|
||||
if (dataBridge == null || dataBridge.getConfig() == null) {
|
||||
log.error("[execute][message({}) config({}) 对应的数据流转目的不存在]", message, config);
|
||||
return;
|
||||
}
|
||||
if (CommonStatusEnum.isDisable(dataBridge.getStatus())) {
|
||||
log.info("[execute][message({}) config({}) 对应的数据流转目的({}) 状态为禁用]", message, config, dataBridge);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 执行数据桥接操作
|
||||
for (IotDataBridgeExecute<?> execute : dataBridgeExecutes) {
|
||||
execute.execute(message, dataBridge);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotRuleSceneActionTypeEnum getType() {
|
||||
return IotRuleSceneActionTypeEnum.DATA_BRIDGE;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
|
||||
/**
|
||||
* IoT 数据流转目的的执行器 execute 接口
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
public interface IotDataBridgeExecute<Config> {
|
||||
|
||||
/**
|
||||
* 获取数据流转目的类型
|
||||
*
|
||||
* @return 数据流转目的类型
|
||||
*/
|
||||
Integer getType();
|
||||
|
||||
/**
|
||||
* 执行数据流转目的操作
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param dataBridge 数据流转目的
|
||||
*/
|
||||
@SuppressWarnings({"unchecked"})
|
||||
default void execute(IotDeviceMessage message, IotDataSinkDO dataBridge) throws Exception {
|
||||
// 1.1 校验数据流转目的类型
|
||||
if (!getType().equals(dataBridge.getType())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 1.2 执行对应的数据流转目的发送消息
|
||||
execute0(message, (Config) dataBridge.getConfig());
|
||||
}
|
||||
|
||||
/**
|
||||
* 【真正】执行数据流转目的操作
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param config 桥梁配置
|
||||
*/
|
||||
void execute0(IotDeviceMessage message, Config config) throws Exception;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
|
||||
/**
|
||||
* IoT 数据流转目的的执行器 action 接口
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
public interface IotDataRuleAction {
|
||||
|
||||
/**
|
||||
* 获取数据流转目的类型
|
||||
*
|
||||
* @return 数据流转目的类型
|
||||
*/
|
||||
Integer getType();
|
||||
|
||||
/**
|
||||
* 执行数据流转目的操作
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param dataSink 数据流转目的
|
||||
*/
|
||||
void execute(IotDeviceMessage message, IotDataSinkDO dataSink);
|
||||
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
@@ -17,14 +18,14 @@ import java.time.Duration;
|
||||
// TODO @芋艿:websocket
|
||||
|
||||
/**
|
||||
* 带缓存功能的数据流转目的执行器抽象类
|
||||
* 可缓存的 {@link IotDataRuleAction} 抽象实现
|
||||
*
|
||||
* 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。
|
||||
*
|
||||
* 主要特点:
|
||||
* - 基于Guava Cache实现高效的生产者实例缓存管理
|
||||
* - 自动处理生产者的生命周期(创建、获取、关闭)
|
||||
* - 支持30分钟未访问自动过期清理机制
|
||||
* - 支持 30 分钟未访问自动过期清理机制
|
||||
* - 异常处理与日志记录,便于问题排查
|
||||
*
|
||||
* 子类需要实现:
|
||||
@@ -36,7 +37,7 @@ import java.time.Duration;
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> implements IotDataBridgeExecute<Config> {
|
||||
public abstract class IotDataRuleCacheableAction<Config, Producer> implements IotDataRuleAction {
|
||||
|
||||
/**
|
||||
* Producer 缓存
|
||||
@@ -45,10 +46,6 @@ public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> imple
|
||||
.expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期
|
||||
.removalListener((RemovalListener<Config, Producer>) notification -> {
|
||||
Producer producer = notification.getValue();
|
||||
if (producer == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
closeProducer(producer);
|
||||
log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
|
||||
@@ -100,15 +97,21 @@ public abstract class AbstractCacheableDataBridgeExecute<Config, Producer> imple
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public void execute(IotDeviceMessage message, IotDataSinkDO dataBridge) {
|
||||
if (ObjUtil.notEqual(dataBridge.getType(), getType())) {
|
||||
return;
|
||||
}
|
||||
public void execute(IotDeviceMessage message, IotDataSinkDO dataSink) {
|
||||
Assert.isTrue(ObjUtil.equal(dataSink.getType(), getType()), "类型({})不匹配", dataSink.getType());
|
||||
try {
|
||||
execute0(message, (Config) dataBridge.getConfig());
|
||||
execute(message, (Config) dataSink.getConfig());
|
||||
} catch (Exception e) {
|
||||
log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e);
|
||||
log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataSink.getConfig(), message, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行数据流转
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param config 配置信息
|
||||
*/
|
||||
protected abstract void execute(IotDeviceMessage message, Config config) throws Exception;
|
||||
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkHttpConfig;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkHttpConfig;
|
||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -17,13 +19,13 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Http 的 {@link IotDataBridgeExecute} 实现类
|
||||
* HTTP 的 {@link IotDataRuleAction} 实现类
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataSinkHttpConfig> {
|
||||
public class IotHttpDataSinkAction implements IotDataRuleAction {
|
||||
|
||||
@Resource
|
||||
private RestTemplate restTemplate;
|
||||
@@ -34,8 +36,9 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataSin
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"unchecked", "deprecation"})
|
||||
public void execute0(IotDeviceMessage message, IotDataSinkHttpConfig config) {
|
||||
public void execute(IotDeviceMessage message, IotDataSinkDO dataSink) {
|
||||
IotDataSinkHttpConfig config = (IotDataSinkHttpConfig) dataSink.getConfig();
|
||||
Assert.notNull(config, "配置({})不能为空", dataSink.getId());
|
||||
String url = null;
|
||||
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
|
||||
HttpEntity<String> requestEntity = null;
|
||||
@@ -17,17 +17,17 @@ import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Kafka 的 {@link IotDataBridgeExecute} 实现类
|
||||
* Kafka 的 {@link IotDataRuleAction} 实现类
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotKafkaMQDataBridgeExecute extends
|
||||
AbstractCacheableDataBridgeExecute<IotDataSinkKafkaConfig, KafkaTemplate<String, String>> {
|
||||
public class IotKafkaDataRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkKafkaConfig, KafkaTemplate<String, String>> {
|
||||
|
||||
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间
|
||||
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
@@ -35,7 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute0(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception {
|
||||
public void execute(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception {
|
||||
// 1. 获取或创建 KafkaTemplate
|
||||
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
|
||||
|
||||
@@ -13,16 +13,15 @@ import org.springframework.stereotype.Component;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* RabbitMQ 的 {@link IotDataBridgeExecute} 实现类
|
||||
* RabbitMQ 的 {@link IotDataRuleAction} 实现类
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@ConditionalOnClass(name = "com.rabbitmq.client.Channel")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotRabbitMQDataBridgeExecute extends
|
||||
AbstractCacheableDataBridgeExecute<IotDataSinkRabbitMQConfig, Channel> {
|
||||
|
||||
public class IotRabbitMQDataRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkRabbitMQConfig, Channel> {
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
@@ -30,16 +29,15 @@ public class IotRabbitMQDataBridgeExecute extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute0(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception {
|
||||
// 1. 获取或创建 Channel
|
||||
public void execute(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception {
|
||||
// 1.1 获取或创建 Channel
|
||||
Channel channel = getProducer(config);
|
||||
|
||||
// 2.1 声明交换机、队列和绑定关系
|
||||
// 1.2 声明交换机、队列和绑定关系
|
||||
channel.exchangeDeclare(config.getExchange(), "direct", true);
|
||||
channel.queueDeclare(config.getQueue(), true, false, false, null);
|
||||
channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
|
||||
|
||||
// 2.2 发送消息
|
||||
// 2. 发送消息
|
||||
channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
|
||||
message.toString().getBytes(StandardCharsets.UTF_8));
|
||||
log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
|
||||
@@ -55,10 +53,8 @@ public class IotRabbitMQDataBridgeExecute extends
|
||||
factory.setVirtualHost(config.getVirtualHost());
|
||||
factory.setUsername(config.getUsername());
|
||||
factory.setPassword(config.getPassword());
|
||||
|
||||
// 2. 创建连接
|
||||
Connection connection = factory.newConnection();
|
||||
|
||||
// 3. 创建信道
|
||||
return connection.createChannel();
|
||||
}
|
||||
@@ -18,14 +18,14 @@ import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Redis Stream 的 {@link IotDataBridgeExecute} 实现类
|
||||
* Redis Stream 的 {@link IotDataRuleAction} 实现类
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotRedisStreamDataBridgeExecute extends
|
||||
AbstractCacheableDataBridgeExecute<IotDataSinkRedisStreamConfig, RedisTemplate<String, Object>> {
|
||||
public class IotRedisStreamRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkRedisStreamConfig, RedisTemplate<String, Object>> {
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
@@ -33,7 +33,7 @@ public class IotRedisStreamDataBridgeExecute extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute0(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception {
|
||||
public void execute(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception {
|
||||
// 1. 获取 RedisTemplate
|
||||
RedisTemplate<String, Object> redisTemplate = getProducer(config);
|
||||
|
||||
@@ -13,15 +13,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* RocketMQ 的 {@link IotDataBridgeExecute} 实现类
|
||||
* RocketMQ 的 {@link IotDataRuleAction} 实现类
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotRocketMQDataBridgeExecute extends
|
||||
AbstractCacheableDataBridgeExecute<IotDataSinkRocketMQConfig, DefaultMQProducer> {
|
||||
public class IotRocketMQDataRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkRocketMQConfig, DefaultMQProducer> {
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
@@ -29,7 +29,7 @@ public class IotRocketMQDataBridgeExecute extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute0(IotDeviceMessage message, IotDataSinkRocketMQConfig config) throws Exception {
|
||||
public void execute(IotDeviceMessage message, IotDataSinkRocketMQConfig config) throws Exception {
|
||||
// 1. 获取或创建 Producer
|
||||
DefaultMQProducer producer = getProducer(config);
|
||||
|
||||
@@ -214,11 +214,6 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
|
||||
.build());
|
||||
action01.setDeviceControl(actionDeviceControl01);
|
||||
// ruleScene01.getActions().add(action01); // TODO 芋艿:先不测试了
|
||||
// 数据桥接(http)
|
||||
IotRuleSceneDO.ActionConfig action02 = new IotRuleSceneDO.ActionConfig();
|
||||
action02.setType(IotRuleSceneActionTypeEnum.DATA_BRIDGE.getType());
|
||||
action02.setDataBridgeId(1L);
|
||||
ruleScene01.getActions().add(action02);
|
||||
return ListUtil.toList(ruleScene01);
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* {@link IotDataBridgeExecute} 实现类的单元测试
|
||||
* {@link IotDataRuleAction} 实现类的单元测试
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@@ -37,7 +37,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
@InjectMocks
|
||||
private IotHttpDataBridgeExecute httpDataBridgeExecute;
|
||||
private IotHttpDataSinkAction httpDataBridgeExecute;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
@@ -51,7 +51,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
@Test
|
||||
public void testKafkaMQDataBridge() throws Exception {
|
||||
// 1. 创建执行器实例
|
||||
IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
|
||||
IotKafkaDataRuleAction action = new IotKafkaDataRuleAction();
|
||||
|
||||
// 2. 创建配置
|
||||
IotDataSinkKafkaConfig config = new IotDataSinkKafkaConfig()
|
||||
@@ -68,7 +68,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
@Test
|
||||
public void testRabbitMQDataBridge() throws Exception {
|
||||
// 1. 创建执行器实例
|
||||
IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
|
||||
IotRabbitMQDataRuleAction action = new IotRabbitMQDataRuleAction();
|
||||
|
||||
// 2. 创建配置
|
||||
IotDataSinkRabbitMQConfig config = new IotDataSinkRabbitMQConfig()
|
||||
@@ -88,7 +88,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
@Test
|
||||
public void testRedisStreamDataBridge() throws Exception {
|
||||
// 1. 创建执行器实例
|
||||
IotRedisStreamDataBridgeExecute action = new IotRedisStreamDataBridgeExecute();
|
||||
IotRedisStreamRuleAction action = new IotRedisStreamRuleAction();
|
||||
|
||||
// 2. 创建配置
|
||||
IotDataSinkRedisStreamConfig config = new IotDataSinkRedisStreamConfig()
|
||||
@@ -105,7 +105,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
@Test
|
||||
public void testRocketMQDataBridge() throws Exception {
|
||||
// 1. 创建执行器实例
|
||||
IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
|
||||
IotRocketMQDataRuleAction action = new IotRocketMQDataRuleAction();
|
||||
|
||||
// 2. 创建配置
|
||||
IotDataSinkRocketMQConfig config = new IotDataSinkRocketMQConfig()
|
||||
@@ -142,7 +142,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
|
||||
* @param type MQ 类型
|
||||
* @throws Exception 如果执行过程中发生异常
|
||||
*/
|
||||
private void executeAndVerifyCache(IotDataBridgeExecute<?> action, IotAbstractDataSinkConfig config, String type)
|
||||
private void executeAndVerifyCache(IotDataRuleAction<?> action, IotAbstractDataSinkConfig config, String type)
|
||||
throws Exception {
|
||||
log.info("[test{}DataBridge][第一次执行,应该会创建新的 producer]", type);
|
||||
action.execute(message, new IotDataSinkDO().setType(action.getType()).setConfig(config));
|
||||
|
||||
Reference in New Issue
Block a user