feat:【IoT 物联网】设备消息查询时,增加 pair 查询

This commit is contained in:
YunaiV
2025-06-19 23:33:16 +08:00
parent e9c5464aac
commit f5c2ee2ae5
16 changed files with 214 additions and 25 deletions

View File

@@ -88,4 +88,14 @@ Authorization: Bearer {{token}}
"fileUrl": "http://example.com/firmware.bin",
"information": "{\"desc\":\"升级到最新版本\"}"
}
}
}
### 查询设备消息对分页 - 基础查询设备编号25
GET {{baseUrl}}/iot/device/message/pair-page?deviceId=25&pageNo=1&pageSize=10
Authorization: Bearer {{token}}
tenant-id: {{adminTenantId}}
### 查询设备消息对分页 - 按标识符过滤identifier=eat
GET {{baseUrl}}/iot/device/message/pair-page?deviceId=25&identifier=eat&pageNo=1&pageSize=10
Authorization: Bearer {{token}}
tenant-id: {{adminTenantId}}

View File

@@ -1,14 +1,19 @@
package cn.iocoder.yudao.module.iot.controller.admin.device;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessageRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessageRespPairVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessageRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessageSendReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceMessageMapper;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
@@ -17,7 +22,12 @@ import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap;
@Tag(name = "管理后台 - IoT 设备消息")
@RestController
@@ -27,19 +37,54 @@ public class IotDeviceMessageController {
@Resource
private IotDeviceMessageService deviceMessageService;
@Resource
private IotDeviceService deviceService;
@Resource
private IotThingModelService thingModelService;
@Resource
private IotDeviceMessageMapper deviceMessageMapper;
@GetMapping("/page")
@Operation(summary = "获得设备消息分页")
@PreAuthorize("@ss.hasPermission('iot:device:message-query')")
public CommonResult<PageResult<IotDeviceMessageRespVO>> getDeviceLogPage(@Valid IotDeviceMessagePageReqVO pageReqVO) {
public CommonResult<PageResult<IotDeviceMessageRespVO>> getDeviceMessagePage(
@Valid IotDeviceMessagePageReqVO pageReqVO) {
PageResult<IotDeviceMessageDO> pageResult = deviceMessageService.getDeviceMessagePage(pageReqVO);
return success(BeanUtils.toBean(pageResult, IotDeviceMessageRespVO.class));
}
@GetMapping("/pair-page")
@Operation(summary = "获得设备消息对分页")
@PreAuthorize("@ss.hasPermission('iot:device:message-query')")
public CommonResult<PageResult<IotDeviceMessageRespPairVO>> getDeviceMessagePairPage(
@Valid IotDeviceMessagePageReqVO pageReqVO) {
// 1.1 先按照条件,查询 request 的消息(非 reply
pageReqVO.setReply(false);
PageResult<IotDeviceMessageDO> requestMessagePageResult = deviceMessageService.getDeviceMessagePage(pageReqVO);
if (CollUtil.isEmpty(requestMessagePageResult.getList())) {
return success(PageResult.empty());
}
// 1.2 接着按照 requestIds批量查询 reply 消息
List<String> requestIds = convertList(requestMessagePageResult.getList(), IotDeviceMessageDO::getRequestId);
List<IotDeviceMessageDO> replyMessageList = deviceMessageService.getDeviceMessageListByRequestIdsAndReply(
pageReqVO.getDeviceId(), requestIds, true);
Map<String, IotDeviceMessageDO> replyMessages = convertMap(replyMessageList, IotDeviceMessageDO::getRequestId);
// 2. 组装结果
List<IotDeviceMessageRespPairVO> pairMessages = convertList(requestMessagePageResult.getList(),
requestMessage -> {
IotDeviceMessageDO replyMessage = replyMessages.get(requestMessage.getRequestId());
return new IotDeviceMessageRespPairVO()
.setRequest(BeanUtils.toBean(requestMessage, IotDeviceMessageRespVO.class))
.setReply(BeanUtils.toBean(replyMessage, IotDeviceMessageRespVO.class));
});
return success(new PageResult<>(pairMessages, requestMessagePageResult.getTotal()));
}
@PostMapping("/send")
@Operation(summary = "发送消息", description = "可用于设备模拟")
@PreAuthorize("@ss.hasPermission('iot:device:message-end')")
public CommonResult<Boolean> upstreamDevice(@Valid @RequestBody IotDeviceMessageSendReqVO sendReqVO) {
public CommonResult<Boolean> sendDeviceMessage(@Valid @RequestBody IotDeviceMessageSendReqVO sendReqVO) {
deviceMessageService.sendDeviceMessage(BeanUtils.toBean(sendReqVO, IotDeviceMessage.class));
return success(true);
}

View File

@@ -5,7 +5,13 @@ import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND;
@Schema(description = "管理后台 - IoT 设备消息分页查询 Request VO")
@Data
@@ -22,4 +28,15 @@ public class IotDeviceMessagePageReqVO extends PageParam {
@Schema(description = "是否上行", example = "true")
private Boolean upstream;
@Schema(description = "是否回复", example = "true")
private Boolean reply;
@Schema(description = "标识符", example = "temperature")
private String identifier;
@Schema(description = "时间范围", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
@Size(min = 2, max = 2, message = "请选择时间范围")
private LocalDateTime[] times;
}

View File

@@ -0,0 +1,16 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo.message;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Schema(description = "管理后台 - IoT 设备消息对 Response VO")
@Data
public class IotDeviceMessageRespPairVO {
@Schema(description = "请求消息", requiredMode = Schema.RequiredMode.REQUIRED)
private IotDeviceMessageRespVO request;
@Schema(description = "响应消息")
private IotDeviceMessageRespVO reply; // 通过 requestId 配对
}

View File

@@ -30,6 +30,9 @@ public class IotDeviceMessageRespVO {
@Schema(description = "是否回复消息", example = "false", examples = "true")
private Boolean reply;
@Schema(description = "标识符", example = "temperature")
private String identifier;
// ========== codec编解码字段 ==========
@Schema(description = "请求编号", example = "req_123")

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.device;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -63,6 +64,13 @@ public class IotDeviceMessageDO {
* 计算并存储的目的:方便计算多少条请求、多少条回复
*/
private Boolean reply;
/**
* 标识符
*
* 例如说:{@link IotThingModelDO#getIdentifier()}
* 目前,只有事件上报、服务调用才有!!!
*/
private String identifier;
// ========== codec编解码字段 ==========

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.vo.IotThingModelP
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import org.apache.ibatis.annotations.Mapper;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
/**
@@ -50,6 +50,12 @@ public interface IotThingModelMapper extends BaseMapperX<IotThingModelDO> {
return selectList(IotThingModelDO::getProductId, productId);
}
default List<IotThingModelDO> selectListByProductIdAndIdentifiers(Long productId, Collection<String> identifiers) {
return selectList(new LambdaQueryWrapperX<IotThingModelDO>()
.eq(IotThingModelDO::getProductId, productId)
.in(IotThingModelDO::getIdentifier, identifiers));
}
default List<IotThingModelDO> selectListByProductIdAndType(Long productId, Integer type) {
return selectList(IotThingModelDO::getProductId, productId,
IotThingModelDO::getType, type);
@@ -69,16 +75,4 @@ public interface IotThingModelMapper extends BaseMapperX<IotThingModelDO> {
IotThingModelDO::getName, name);
}
// TODO @super用不到删除下
/**
* 统计物模型数量
*
* @param createTime 创建时间,如果为空,则统计所有物模型数量
* @return 物模型数量
*/
default Long selectCountByCreateTime(LocalDateTime createTime) {
return selectCount(new LambdaQueryWrapperX<IotThingModelDO>()
.geIfPresent(IotThingModelDO::getCreateTime, createTime));
}
}

View File

@@ -8,6 +8,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -57,6 +58,18 @@ public interface IotDeviceMessageMapper {
*/
Long selectCountByCreateTime(@Param("createTime") Long createTime);
/**
* 按照 requestIds 批量查询消息
*
* @param deviceId 设备编号
* @param requestIds 请求编号集合
* @param reply 是否回复消息
* @return 消息列表
*/
List<IotDeviceMessageDO> selectListByRequestIdsAndReply(@Param("deviceId") Long deviceId,
@Param("requestIds") Collection<String> requestIds,
@Param("reply") Boolean reply);
/**
* 按照时间范围(小时),统计设备的消息数量
*/

View File

@@ -32,7 +32,7 @@ public interface IotDevicePropertyMapper {
List<TDengineTableField> oldFields,
List<TDengineTableField> newFields) {
oldFields.removeIf(field -> StrUtil.equalsAny(field.getField(),
TDengineTableField.FIELD_TS, "report_time"));
TDengineTableField.FIELD_TS, "report_time", "device_id"));
List<TDengineTableField> addFields = newFields.stream().filter( // 新增的字段
newField -> oldFields.stream().noneMatch(oldField -> oldField.getField().equals(newField.getField())))
.collect(Collectors.toList());

View File

@@ -7,6 +7,8 @@ import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsD
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
@@ -63,6 +65,19 @@ public interface IotDeviceMessageService {
*/
PageResult<IotDeviceMessageDO> getDeviceMessagePage(IotDeviceMessagePageReqVO pageReqVO);
/**
* 获得指定 requestId 的设备消息列表
*
* @param deviceId 设备编号
* @param requestIds requestId 列表
* @param reply 是否回复
* @return 设备消息列表
*/
List<IotDeviceMessageDO> getDeviceMessageListByRequestIdsAndReply(
@NotNull(message = "设备编号不能为空") Long deviceId,
@NotEmpty(message = "请求编号不能为空") List<String> requestIds,
Boolean reply);
/**
* 获得设备消息数量
*

View File

@@ -76,7 +76,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
void createDeviceLogAsync(IotDeviceMessage message) {
IotDeviceMessageDO messageDO = BeanUtils.toBean(message, IotDeviceMessageDO.class)
.setUpstream(IotDeviceMessageUtils.isUpstreamMessage(message))
.setReply(IotDeviceMessageUtils.isReplyMessage(message));
.setReply(IotDeviceMessageUtils.isReplyMessage(message))
.setIdentifier(IotDeviceMessageUtils.getIdentifier(message));
if (message.getParams() != null) {
messageDO.setParams(JsonUtils.toJsonString(messageDO.getParams()));
}
@@ -212,6 +213,13 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
}
}
@Override
public List<IotDeviceMessageDO> getDeviceMessageListByRequestIdsAndReply(Long deviceId,
List<String> requestIds,
Boolean reply) {
return deviceMessageMapper.selectListByRequestIdsAndReply(deviceId, requestIds, reply);
}
@Override
public Long getDeviceMessageCount(LocalDateTime createTime) {
return deviceMessageMapper.selectCountByCreateTime(createTime != null ? LocalDateTimeUtil.toEpochMilli(createTime) : null);

View File

@@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.vo.IotThingModelS
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import jakarta.validation.Valid;
import java.util.Collection;
import java.util.List;
/**
@@ -54,6 +55,15 @@ public interface IotThingModelService {
*/
List<IotThingModelDO> getThingModelListByProductId(Long productId);
/**
* 获得产品物模型列表
*
* @param productId 产品编号
* @param identifiers 功能标识列表
* @return 产品物模型列表
*/
List<IotThingModelDO> getThingModelListByProductIdAndIdentifiers(Long productId, Collection<String> identifiers);
/**
* 获得产品物模型列表
*

View File

@@ -131,6 +131,11 @@ public class IotThingModelServiceImpl implements IotThingModelService {
return thingModelMapper.selectListByProductId(productId);
}
@Override
public List<IotThingModelDO> getThingModelListByProductIdAndIdentifiers(Long productId, Collection<String> identifiers) {
return thingModelMapper.selectListByProductIdAndIdentifiers(productId, identifiers);
}
@Override
public List<IotThingModelDO> getThingModelListByProductIdAndType(Long productId, Integer type) {
return thingModelMapper.selectListByProductIdAndType(productId, type);

View File

@@ -13,6 +13,7 @@
server_id NCHAR(50),
upstream BOOL,
reply BOOL,
identifier NCHAR(100),
request_id NCHAR(50),
method NCHAR(100),
params NCHAR(2048),
@@ -31,22 +32,22 @@
<insert id="insert">
INSERT INTO device_message_${deviceId} (
ts, id, report_time, tenant_id, server_id,
upstream, reply, request_id, method, params,
data, code, msg
upstream, reply, identifier, request_id, method,
params, data, code, msg
)
USING device_message
TAGS (#{deviceId})
VALUES (
NOW, #{id}, #{reportTime}, #{tenantId}, #{serverId},
#{upstream}, #{reply}, #{requestId}, #{method}, #{params},
#{data}, #{code}, #{msg}
#{upstream}, #{reply}, #{identifier}, #{requestId}, #{method},
#{params}, #{data}, #{code}, #{msg}
)
</insert>
<select id="selectPage" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO">
SELECT ts, id, report_time, tenant_id, server_id,
upstream, reply, request_id, method, params,
data, code, msg
upstream, reply, identifier, request_id, method,
params, data, code, msg
FROM device_message_${reqVO.deviceId}
<where>
<if test="reqVO.method != null and reqVO.method != ''">
@@ -55,10 +56,29 @@
<if test="reqVO.upstream != null">
AND upstream = #{reqVO.upstream}
</if>
<if test="reqVO.reply != null">
AND reply = #{reqVO.reply}
</if>
<if test="reqVO.identifier != null and reqVO.identifier != ''">
AND identifier = #{reqVO.identifier}
</if>
</where>
ORDER BY ts DESC
</select>
<select id="selectListByRequestIdsAndReply" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO">
SELECT ts, id, report_time, tenant_id, server_id,
upstream, reply, identifier, request_id, method,
params, data, code, msg
FROM device_message_${deviceId}
WHERE reply = #{reply}
AND request_id IN
<foreach collection="requestIds" item="requestId" open="(" close=")" separator=",">
#{requestId}
</foreach>
ORDER BY ts DESC
</select>
<select id="selectCountByCreateTime" resultType="Long">
SELECT COUNT(*)
FROM device_message

View File

@@ -32,6 +32,10 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
EVENT_POST("thing.event.post", "事件上报", true),
// ========== 设备服务调用 ==========
SERVICE_INVOKE("thing.service.invoke", "服务调用", false),
;
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod)

View File

@@ -1,11 +1,15 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import java.util.Map;
/**
* IoT 设备【消息】的工具类
*
@@ -43,6 +47,23 @@ public class IotDeviceMessageUtils {
return message.getCode() != null;
}
/**
* 提取消息中的标识符
*
* @param message 消息
* @return 标识符
*/
@SuppressWarnings("unchecked")
public static String getIdentifier(IotDeviceMessage message) {
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
message.getMethod(), IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())
&& message.getParams() != null) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "identifier");
}
return null;
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {