feat:【IoT 物联网】初步实现“数据流转”功能

This commit is contained in:
YunaiV
2025-06-24 21:21:35 +08:00
parent afda934c1d
commit 956418d31f
31 changed files with 423 additions and 94 deletions

View File

@@ -54,10 +54,13 @@ public interface ErrorCodeConstants {
ErrorCode OTA_UPGRADE_RECORD_DUPLICATE = new ErrorCode(1_050_008_201, "升级记录重复");
ErrorCode OTA_UPGRADE_RECORD_CANNOT_RETRY = new ErrorCode(1_050_008_202, "升级记录不能重试");
// ========== IoT 数据桥梁 1-050-010-000 ==========
ErrorCode DATA_BRIDGE_NOT_EXISTS = new ErrorCode(1_050_010_000, "IoT 数据桥梁不存在");
// ========== IoT 数据流转规则 1-050-010-000 ==========
ErrorCode DATA_RULE_NOT_EXISTS = new ErrorCode(1_050_010_000, "数据流转规则不存在");
// ========== IoT 场景联动 1-050-011-000 ==========
ErrorCode RULE_SCENE_NOT_EXISTS = new ErrorCode(1_050_011_000, "IoT 场景联动不存在");
// ========== IoT 数据流转目的 1-050-011-000 ==========
ErrorCode DATA_BRIDGE_NOT_EXISTS = new ErrorCode(1_050_011_000, "数据桥梁不存在");
// ========== IoT 场景联动 1-050-012-000 ==========
ErrorCode RULE_SCENE_NOT_EXISTS = new ErrorCode(1_050_012_000, "场景联动不存在");
}

View File

@@ -0,0 +1,72 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import cn.iocoder.yudao.module.iot.service.rule.data.IotDataRuleService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
@Tag(name = "管理后台 - IoT 数据流转规则")
@RestController
@RequestMapping("/iot/data-rule")
@Validated
public class IotDataRuleController {
@Resource
private IotDataRuleService dataRuleService;
@PostMapping("/create")
@Operation(summary = "创建数据流转规则")
@PreAuthorize("@ss.hasPermission('iot:data-rule:create')")
public CommonResult<Long> createDataRule(@Valid @RequestBody IotDataRuleSaveReqVO createReqVO) {
return success(dataRuleService.createDataRule(createReqVO));
}
@PutMapping("/update")
@Operation(summary = "更新数据流转规则")
@PreAuthorize("@ss.hasPermission('iot:data-rule:update')")
public CommonResult<Boolean> updateDataRule(@Valid @RequestBody IotDataRuleSaveReqVO updateReqVO) {
dataRuleService.updateDataRule(updateReqVO);
return success(true);
}
@DeleteMapping("/delete")
@Operation(summary = "删除数据流转规则")
@Parameter(name = "id", description = "编号", required = true)
@PreAuthorize("@ss.hasPermission('iot:data-rule:delete')")
public CommonResult<Boolean> deleteDataRule(@RequestParam("id") Long id) {
dataRuleService.deleteDataRule(id);
return success(true);
}
@GetMapping("/get")
@Operation(summary = "获得数据流转规则")
@Parameter(name = "id", description = "编号", required = true, example = "1024")
@PreAuthorize("@ss.hasPermission('iot:data-rule:query')")
public CommonResult<IotDataRuleRespVO> getDataRule(@RequestParam("id") Long id) {
IotDataRuleDO dataRule = dataRuleService.getDataRule(id);
return success(BeanUtils.toBean(dataRule, IotDataRuleRespVO.class));
}
@GetMapping("/page")
@Operation(summary = "获得数据流转规则分页")
@PreAuthorize("@ss.hasPermission('iot:data-rule:query')")
public CommonResult<PageResult<IotDataRuleRespVO>> getDataRulePage(@Valid IotDataRulePageReqVO pageReqVO) {
PageResult<IotDataRuleDO> pageResult = dataRuleService.getDataRulePage(pageReqVO);
return success(BeanUtils.toBean(pageResult, IotDataRuleRespVO.class));
}
}

View File

@@ -75,7 +75,7 @@ public class IotDataSinkController {
@GetMapping("/simple-list")
@Operation(summary = "获取数据目的的精简信息列表", description = "主要用于前端的下拉选项")
public CommonResult<List<IotDataSinkRespVO>> getSimpleDataSinkList() {
public CommonResult<List<IotDataSinkRespVO>> getDataSinkSimpleList() {
List<IotDataSinkDO> list = dataSinkService.getDataSinkListByStatus(CommonStatusEnum.ENABLE.getStatus());
return success(convertList(list, sink -> // 只返回 id、name 字段
new IotDataSinkRespVO().setId(sink.getId()).setName(sink.getName())));

View File

@@ -0,0 +1,26 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND;
@Schema(description = "管理后台 - IoT 数据流转规则分页 Request VO")
@Data
public class IotDataRulePageReqVO extends PageParam {
@Schema(description = "场景名称", example = "芋艿")
private String name;
@Schema(description = "场景状态", example = "1")
private Integer status;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;
}

View File

@@ -0,0 +1,35 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
@Schema(description = "管理后台 - IoT 数据流转规则 Response VO")
@Data
public class IotDataRuleRespVO {
@Schema(description = "场景编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "8540")
private Long id;
@Schema(description = "场景名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "芋艿")
private String name;
@Schema(description = "场景描述", example = "你猜")
private String description;
@Schema(description = "场景状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
private Integer status;
@Schema(description = "数据源配置数组", requiredMode = Schema.RequiredMode.REQUIRED)
private List<IotDataRuleDO.SourceConfig> sourceConfigs;
@Schema(description = "数据目的编号数组", requiredMode = Schema.RequiredMode.REQUIRED)
private List<Long> sinkIds;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,40 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
@Schema(description = "管理后台 - IoT 数据流转规则新增/修改 Request VO")
@Data
public class IotDataRuleSaveReqVO {
@Schema(description = "场景编号", example = "8540")
private Long id;
@Schema(description = "场景名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "芋艿")
@NotEmpty(message = "场景名称不能为空")
private String name;
@Schema(description = "场景描述", example = "你猜")
private String description;
@Schema(description = "场景状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@NotNull(message = "场景状态不能为空")
@InEnum(CommonStatusEnum.class)
private Integer status;
@Schema(description = "数据源配置数组", requiredMode = Schema.RequiredMode.REQUIRED)
@NotEmpty(message = "数据源配置数组不能为空")
private List<IotDataRuleDO.SourceConfig> sourceConfigs;
@Schema(description = "数据目的编号数组", requiredMode = Schema.RequiredMode.REQUIRED)
@NotEmpty(message = "数据目的编号数组不能为空")
private List<Long> sinkIds;
}

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeAbstractConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotAbstractDataSinkConfig;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@@ -26,7 +26,7 @@ public class IotDataSinkRespVO {
private Integer type;
@Schema(description = "数据目的配置")
private IotDataBridgeAbstractConfig config;
private IotAbstractDataSinkConfig config;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime createTime;

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeAbstractConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotAbstractDataSinkConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
@@ -35,6 +35,6 @@ public class IotDataSinkSaveReqVO {
@Schema(description = "数据目的配置")
@NotNull(message = "数据目的配置不能为空")
private IotDataBridgeAbstractConfig config;
private IotAbstractDataSinkConfig config;
}

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.framework.mybatis.core.type.LongListTypeHandler;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
@@ -10,6 +11,7 @@ import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import jakarta.validation.constraints.NotEmpty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -24,28 +26,28 @@ import java.util.List;
*
* @author 芋道源码
*/
@TableName(value = "iot_data_flow", autoResultMap = true)
@KeySequence("iot_data_flow_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@TableName(value = "iot_data_rule", autoResultMap = true)
@KeySequence("iot_data_rule_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IotDataRuleDO {
public class IotDataRuleDO extends BaseDO {
/**
* 数据流转编号
* 数据流转规格编号
*/
private Long id;
/**
* 数据流转名称
* 数据流转规格名称
*/
private String name;
/**
* 数据流转描述
* 数据流转规格描述
*/
private String description;
/**
* 数据流转状态
* 数据流转规格状态
*
* 枚举 {@link CommonStatusEnum}
*/
@@ -57,7 +59,7 @@ public class IotDataRuleDO {
@TableField(typeHandler = JacksonTypeHandler.class)
private List<SourceConfig> sourceConfigs;
/**
* 数据目的编号
* 数据目的编号数组
*
* 关联 {@link IotDataSinkDO#getId()}
*/
@@ -77,6 +79,7 @@ public class IotDataRuleDO {
*
* 枚举 {@link IotDeviceMessageMethodEnum} 中的 upstream 上行部分
*/
@NotEmpty(message = "消息方法不能为空")
private String method;
/**
@@ -91,6 +94,7 @@ public class IotDataRuleDO {
* 关联 {@link IotDeviceDO#getId()}
* 特殊:如果为 {@link IotDeviceDO#DEVICE_ID_ALL} 时,则是全部设备
*/
@NotEmpty(message = "设备编号不能为空")
private Long deviceId;
/**

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeAbstractConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotAbstractDataSinkConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -28,35 +28,35 @@ import lombok.NoArgsConstructor;
public class IotDataSinkDO extends BaseDO {
/**
* 数据目的编号
* 数据流转目的编号
*/
@TableId
private Long id;
/**
* 数据目的名称
* 数据流转目的名称
*/
private String name;
/**
* 数据目的描述
* 数据流转目的描述
*/
private String description;
/**
* 数据目的状态
* 数据流转目的状态
*
* 枚举 {@link CommonStatusEnum}
*/
private Integer status;
/**
* 数据目的类型
* 数据流转目的类型
*
* 枚举 {@link IotDataSinkTypeEnum}
*/
private Integer type;
/**
* 数据目的配置
* 数据流转目的配置
*/
@TableField(typeHandler = JacksonTypeHandler.class)
private IotDataBridgeAbstractConfig config;
private IotAbstractDataSinkConfig config;
}

View File

@@ -22,8 +22,9 @@ import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
// TODO @芋艿:优化注释;
/**
* IoT 场景联动 DO
* IoT 场景联动规则 DO
*
* @author 芋道源码
*/

View File

@@ -16,14 +16,14 @@ import lombok.Data;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataBridgeRedisStreamConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "31"),
@JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "32"),
@JsonSubTypes.Type(value = IotDataSinkHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataSinkRedisStreamConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataSinkRabbitMQConfig.class, name = "31"),
@JsonSubTypes.Type(value = IotDataSinkKafkaConfig.class, name = "32"),
})
public abstract class IotDataBridgeAbstractConfig {
public abstract class IotAbstractDataSinkConfig {
/**
* 配置类型

View File

@@ -5,12 +5,12 @@ import lombok.Data;
import java.util.Map;
/**
* IoT HTTP 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT HTTP 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeHttpConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkHttpConfig extends IotAbstractDataSinkConfig {
/**
* 请求 URL

View File

@@ -3,12 +3,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT Kafka 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT Kafka 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeKafkaMQConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkKafkaConfig extends IotAbstractDataSinkConfig {
/**
* Kafka 服务器地址

View File

@@ -3,12 +3,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT MQTT 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT MQTT 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeMqttConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkMqttConfig extends IotAbstractDataSinkConfig {
/**
* MQTT 服务器地址

View File

@@ -3,12 +3,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT RabbitMQ 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT RabbitMQ 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRabbitMQConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkRabbitMQConfig extends IotAbstractDataSinkConfig {
/**
* RabbitMQ 服务器地址

View File

@@ -3,12 +3,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT Redis Stream 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT Redis Stream 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRedisStreamConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkRedisStreamConfig extends IotAbstractDataSinkConfig {
/**
* Redis 服务器地址

View File

@@ -3,12 +3,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT RocketMQ 配置 {@link IotDataBridgeAbstractConfig} 实现类
* IoT RocketMQ 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRocketMQConfig extends IotDataBridgeAbstractConfig {
public class IotDataSinkRocketMQConfig extends IotAbstractDataSinkConfig {
/**
* RocketMQ 名称服务器地址

View File

@@ -0,0 +1,26 @@
package cn.iocoder.yudao.module.iot.dal.mysql.rule;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import org.apache.ibatis.annotations.Mapper;
/**
* IoT 数据流转规则 Mapper
*
* @author 芋道源码
*/
@Mapper
public interface IotDataRuleMapper extends BaseMapperX<IotDataRuleDO> {
default PageResult<IotDataRuleDO> selectPage(IotDataRulePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<IotDataRuleDO>()
.likeIfPresent(IotDataRuleDO::getName, reqVO.getName())
.eqIfPresent(IotDataRuleDO::getStatus, reqVO.getStatus())
.betweenIfPresent(IotDataRuleDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(IotDataRuleDO::getId));
}
}

View File

@@ -0,0 +1,54 @@
package cn.iocoder.yudao.module.iot.service.rule.data;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import jakarta.validation.Valid;
/**
* IoT 数据流转规则 Service 接口
*
* @author 芋道源码
*/
public interface IotDataRuleService {
/**
* 创建数据流转规则
*
* @param createReqVO 创建信息
* @return 编号
*/
Long createDataRule(@Valid IotDataRuleSaveReqVO createReqVO);
/**
* 更新数据流转规则
*
* @param updateReqVO 更新信息
*/
void updateDataRule(@Valid IotDataRuleSaveReqVO updateReqVO);
/**
* 删除数据流转规则
*
* @param id 编号
*/
void deleteDataRule(Long id);
/**
* 获得数据流转规则
*
* @param id 编号
* @return 数据流转规则
*/
IotDataRuleDO getDataRule(Long id);
/**
* 获得数据流转规则分页
*
* @param pageReqVO 分页查询
* @return 数据流转规则分页
*/
PageResult<IotDataRuleDO> getDataRulePage(IotDataRulePageReqVO pageReqVO);
}

View File

@@ -0,0 +1,68 @@
package cn.iocoder.yudao.module.iot.service.rule.data;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataRuleMapper;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT_EXISTS;
/**
* IoT 数据流转规则 Service 实现类
*
* @author 芋道源码
*/
@Service
@Validated
public class IotDataRuleServiceImpl implements IotDataRuleService {
@Resource
private IotDataRuleMapper dataRuleMapper;
@Override
public Long createDataRule(IotDataRuleSaveReqVO createReqVO) {
IotDataRuleDO dataRule = BeanUtils.toBean(createReqVO, IotDataRuleDO.class);
dataRuleMapper.insert(dataRule);
return dataRule.getId();
}
@Override
public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) {
// 校验存在
validateDataRuleExists(updateReqVO.getId());
// 更新
IotDataRuleDO updateObj = BeanUtils.toBean(updateReqVO, IotDataRuleDO.class);
dataRuleMapper.updateById(updateObj);
}
@Override
public void deleteDataRule(Long id) {
// 校验存在
validateDataRuleExists(id);
// 删除
dataRuleMapper.deleteById(id);
}
private void validateDataRuleExists(Long id) {
if (dataRuleMapper.selectById(id) == null) {
throw exception(DATA_RULE_NOT_EXISTS);
}
}
@Override
public IotDataRuleDO getDataRule(Long id) {
return dataRuleMapper.selectById(id);
}
@Override
public PageResult<IotDataRuleDO> getDataRulePage(IotDataRulePageReqVO pageReqVO) {
return dataRuleMapper.selectPage(pageReqVO);
}
}

View File

@@ -16,7 +16,7 @@ import java.util.List;
public interface IotDataSinkService {
/**
* 创建数据目的
* 创建数据流转目的
*
* @param createReqVO 创建信息
* @return 编号
@@ -24,40 +24,40 @@ public interface IotDataSinkService {
Long createDataSink(@Valid IotDataSinkSaveReqVO createReqVO);
/**
* 更新数据目的
* 更新数据流转目的
*
* @param updateReqVO 更新信息
*/
void updateDataSink(@Valid IotDataSinkSaveReqVO updateReqVO);
/**
* 删除数据目的
* 删除数据流转目的
*
* @param id 编号
*/
void deleteDataSink(Long id);
/**
* 获得数据目的
* 获得数据流转目的
*
* @param id 编号
* @return 数据目的
* @return 数据流转目的
*/
IotDataSinkDO getDataSink(Long id);
/**
* 获得数据目的分页
* 获得数据流转目的分页
*
* @param pageReqVO 分页查询
* @return 数据目的分页
* @return 数据流转目的分页
*/
PageResult<IotDataSinkDO> getDataSinkPage(IotDataSinkPageReqVO pageReqVO);
/**
* 获取数据目的列表
* 获取数据流转目的列表
*
* @param status 状态,如果为空,则不进行筛选
* @return 数据目的列表
* @return 数据流转目的列表
*/
List<IotDataSinkDO> getDataSinkListByStatus(Integer status);

View File

@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
* IoT 数据桥梁的 {@link IotRuleSceneAction} 实现类
* IoT 数据流转目的的 {@link IotRuleSceneAction} 实现类
*
* @author 芋道源码
*/
@@ -35,15 +35,15 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
if (message == null) {
return;
}
// 1.2 获得数据桥梁
Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空");
// 1.2 获得数据流转目的
Assert.notNull(config.getDataBridgeId(), "数据流转目的编号不能为空");
IotDataSinkDO dataBridge = dataBridgeService.getDataSink(config.getDataBridgeId());
if (dataBridge == null || dataBridge.getConfig() == null) {
log.error("[execute][message({}) config({}) 对应的数据桥梁不存在]", message, config);
log.error("[execute][message({}) config({}) 对应的数据流转目的不存在]", message, config);
return;
}
if (CommonStatusEnum.isDisable(dataBridge.getStatus())) {
log.info("[execute][message({}) config({}) 对应的数据桥梁({}) 状态为禁用]", message, config, dataBridge);
log.info("[execute][message({}) config({}) 对应的数据流转目的({}) 状态为禁用]", message, config, dataBridge);
return;
}

View File

@@ -17,7 +17,7 @@ import java.time.Duration;
// TODO @芋艿websocket
/**
* 带缓存功能的数据桥梁执行器抽象类
* 带缓存功能的数据流转目的执行器抽象类
*
* 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。
*

View File

@@ -4,38 +4,38 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
/**
* IoT 数据桥梁的执行器 execute 接口
* IoT 数据流转目的的执行器 execute 接口
*
* @author HUIHUI
*/
public interface IotDataBridgeExecute<Config> {
/**
* 获取数据桥梁类型
* 获取数据流转目的类型
*
* @return 数据桥梁类型
* @return 数据流转目的类型
*/
Integer getType();
/**
* 执行数据桥梁操作
* 执行数据流转目的操作
*
* @param message 设备消息
* @param dataBridge 数据桥梁
* @param dataBridge 数据流转目的
*/
@SuppressWarnings({"unchecked"})
default void execute(IotDeviceMessage message, IotDataSinkDO dataBridge) throws Exception {
// 1.1 校验数据桥梁类型
// 1.1 校验数据流转目的类型
if (!getType().equals(dataBridge.getType())) {
return;
}
// 1.2 执行对应的数据桥梁发送消息
// 1.2 执行对应的数据流转目的发送消息
execute0(message, (Config) dataBridge.getConfig());
}
/**
* 【真正】执行数据桥梁操作
* 【真正】执行数据流转目的操作
*
* @param message 设备消息
* @param config 桥梁配置

View File

@@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeHttpConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkHttpConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import jakarta.annotation.Resource;
@@ -23,7 +23,7 @@ import java.util.Map;
*/
@Component
@Slf4j
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBridgeHttpConfig> {
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataSinkHttpConfig> {
@Resource
private RestTemplate restTemplate;
@@ -35,7 +35,7 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBri
@Override
@SuppressWarnings({"unchecked", "deprecation"})
public void execute0(IotDeviceMessage message, IotDataBridgeHttpConfig config) {
public void execute0(IotDeviceMessage message, IotDataSinkHttpConfig config) {
String url = null;
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
HttpEntity<String> requestEntity = null;

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeKafkaMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkKafkaConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class IotKafkaMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeKafkaMQConfig, KafkaTemplate<String, String>> {
AbstractCacheableDataBridgeExecute<IotDataSinkKafkaConfig, KafkaTemplate<String, String>> {
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间
@@ -35,7 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeKafkaMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataSinkKafkaConfig config) throws Exception {
// 1. 获取或创建 KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
@@ -46,7 +46,7 @@ public class IotKafkaMQDataBridgeExecute extends
}
@Override
protected KafkaTemplate<String, String> initProducer(IotDataBridgeKafkaMQConfig config) {
protected KafkaTemplate<String, String> initProducer(IotDataSinkKafkaConfig config) {
// 1.1 构建生产者配置
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeRabbitMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRabbitMQConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.rabbitmq.client.Channel;
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class IotRabbitMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRabbitMQConfig, Channel> {
AbstractCacheableDataBridgeExecute<IotDataSinkRabbitMQConfig, Channel> {
@Override
@@ -30,7 +30,7 @@ public class IotRabbitMQDataBridgeExecute extends
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeRabbitMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataSinkRabbitMQConfig config) throws Exception {
// 1. 获取或创建 Channel
Channel channel = getProducer(config);
@@ -47,7 +47,7 @@ public class IotRabbitMQDataBridgeExecute extends
@Override
@SuppressWarnings("resource")
protected Channel initProducer(IotDataBridgeRabbitMQConfig config) throws Exception {
protected Channel initProducer(IotDataSinkRabbitMQConfig config) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.getHost());

View File

@@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeRedisStreamConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRedisStreamConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
@@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
public class IotRedisStreamDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRedisStreamConfig, RedisTemplate<String, Object>> {
AbstractCacheableDataBridgeExecute<IotDataSinkRedisStreamConfig, RedisTemplate<String, Object>> {
@Override
public Integer getType() {
@@ -33,7 +33,7 @@ public class IotRedisStreamDataBridgeExecute extends
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataSinkRedisStreamConfig config) throws Exception {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config);
@@ -45,7 +45,7 @@ public class IotRedisStreamDataBridgeExecute extends
}
@Override
protected RedisTemplate<String, Object> initProducer(IotDataBridgeRedisStreamConfig config) {
protected RedisTemplate<String, Object> initProducer(IotDataSinkRedisStreamConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataBridgeRocketMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkRocketMQConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
@@ -21,7 +21,7 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
public class IotRocketMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeRocketMQConfig, DefaultMQProducer> {
AbstractCacheableDataBridgeExecute<IotDataSinkRocketMQConfig, DefaultMQProducer> {
@Override
public Integer getType() {
@@ -29,7 +29,7 @@ public class IotRocketMQDataBridgeExecute extends
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeRocketMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataSinkRocketMQConfig config) throws Exception {
// 1. 获取或创建 Producer
DefaultMQProducer producer = getProducer(config);
@@ -50,7 +50,7 @@ public class IotRocketMQDataBridgeExecute extends
}
@Override
protected DefaultMQProducer initProducer(IotDataBridgeRocketMQConfig config) throws Exception {
protected DefaultMQProducer initProducer(IotDataSinkRocketMQConfig config) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
producer.setNamesrvAddr(config.getNameServer());
producer.start();

View File

@@ -2,8 +2,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.*;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
import cn.iocoder.yudao.module.iot.service.rule.data.action.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
@@ -54,7 +54,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
// 2. 创建配置
IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig()
IotDataSinkKafkaConfig config = new IotDataSinkKafkaConfig()
.setBootstrapServers("127.0.0.1:9092")
.setTopic("test-topic")
.setSsl(false)
@@ -71,7 +71,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
// 2. 创建配置
IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig()
IotDataSinkRabbitMQConfig config = new IotDataSinkRabbitMQConfig()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
@@ -91,7 +91,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
IotRedisStreamDataBridgeExecute action = new IotRedisStreamDataBridgeExecute();
// 2. 创建配置
IotDataBridgeRedisStreamConfig config = new IotDataBridgeRedisStreamConfig()
IotDataSinkRedisStreamConfig config = new IotDataSinkRedisStreamConfig()
.setHost("127.0.0.1")
.setPort(6379)
.setDatabase(0)
@@ -108,7 +108,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
// 2. 创建配置
IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig()
IotDataSinkRocketMQConfig config = new IotDataSinkRocketMQConfig()
.setNameServer("127.0.0.1:9876")
.setGroup("test-group")
.setTopic("test-topic")
@@ -125,7 +125,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
.thenReturn(new ResponseEntity<>("Success", HttpStatus.OK));
// 2. 创建配置
IotDataBridgeHttpConfig config = new IotDataBridgeHttpConfig()
IotDataSinkHttpConfig config = new IotDataSinkHttpConfig()
.setUrl("https://doc.iocoder.cn/").setMethod(HttpMethod.GET.name());
// 3. 执行测试
@@ -142,7 +142,7 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
* @param type MQ 类型
* @throws Exception 如果执行过程中发生异常
*/
private void executeAndVerifyCache(IotDataBridgeExecute<?> action, IotDataBridgeAbstractConfig config, String type)
private void executeAndVerifyCache(IotDataBridgeExecute<?> action, IotAbstractDataSinkConfig config, String type)
throws Exception {
log.info("[test{}DataBridge][第一次执行,应该会创建新的 producer]", type);
action.execute(message, new IotDataSinkDO().setType(action.getType()).setConfig(config));