feat:【IoT 物联网】实现 OTA 升级推送任务 IotOtaUpgradeJob
This commit is contained in:
@@ -40,6 +40,13 @@ public interface IotOtaTaskRecordMapper extends BaseMapperX<IotOtaTaskRecordDO>
|
||||
.in(IotOtaTaskRecordDO::getStatus, statuses));
|
||||
}
|
||||
|
||||
default int updateByIdAndStatus(Long id, Integer status,
|
||||
IotOtaTaskRecordDO updateObj) {
|
||||
return update(updateObj, new LambdaUpdateWrapper<IotOtaTaskRecordDO>()
|
||||
.eq(IotOtaTaskRecordDO::getId, id)
|
||||
.eq(IotOtaTaskRecordDO::getStatus, status));
|
||||
}
|
||||
|
||||
default int updateByIdAndStatus(Long id, Collection<Integer> whereStatuses,
|
||||
IotOtaTaskRecordDO updateObj) {
|
||||
return update(updateObj, new LambdaUpdateWrapper<IotOtaTaskRecordDO>()
|
||||
@@ -60,4 +67,8 @@ public interface IotOtaTaskRecordMapper extends BaseMapperX<IotOtaTaskRecordDO>
|
||||
.inIfPresent(IotOtaTaskRecordDO::getStatus, statuses));
|
||||
}
|
||||
|
||||
default List<IotOtaTaskRecordDO> selectListByStatus(Integer status) {
|
||||
return selectList(IotOtaTaskRecordDO::getStatus, status);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
package cn.iocoder.yudao.module.iot.job.ota;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
|
||||
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO;
|
||||
import cn.iocoder.yudao.module.iot.enums.ota.IotOtaTaskRecordStatusEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.ota.IotOtaFirmwareService;
|
||||
import cn.iocoder.yudao.module.iot.service.ota.IotOtaTaskRecordService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT OTA 升级推送 Job:查询待推送的 OTA 升级记录,并推送给设备
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotOtaUpgradeJob implements JobHandler {
|
||||
|
||||
@Resource
|
||||
private IotOtaTaskRecordService otaTaskRecordService;
|
||||
@Resource
|
||||
private IotOtaFirmwareService otaFirmwareService;
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Override
|
||||
@TenantJob
|
||||
public String execute(String param) throws Exception {
|
||||
// 1. 查询待推送的 OTA 升级记录
|
||||
List<IotOtaTaskRecordDO> records = otaTaskRecordService.getOtaRecordListByStatus(
|
||||
IotOtaTaskRecordStatusEnum.PENDING.getStatus());
|
||||
if (CollUtil.isEmpty(records)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 2. 遍历推送记录
|
||||
int successCount = 0;
|
||||
int failureCount = 0;
|
||||
Map<Long, IotOtaFirmwareDO> otaFirmwares = new HashMap<>();
|
||||
for (IotOtaTaskRecordDO record : records) {
|
||||
try {
|
||||
// 2.1 设备如果不在线,直接跳过
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(record.getDeviceId());
|
||||
if (device == null || IotDeviceStateEnum.isNotOnline(device.getState())) {
|
||||
continue;
|
||||
}
|
||||
// 2.2 获取 OTA 固件信息
|
||||
IotOtaFirmwareDO fireware = otaFirmwares.get(record.getFirmwareId());
|
||||
if (fireware == null) {
|
||||
fireware = otaFirmwareService.getOtaFirmware(record.getFirmwareId());
|
||||
otaFirmwares.put(record.getFirmwareId(), fireware);
|
||||
}
|
||||
// 2.3 推送 OTA 升级任务
|
||||
boolean result = otaTaskRecordService.pushOtaTaskRecord(record, fireware, device);
|
||||
if (result) {
|
||||
successCount++;
|
||||
} else {
|
||||
failureCount++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
failureCount++;
|
||||
log.error("[execute][推送 OTA 升级任务({})发生异常]", record.getId(), e);
|
||||
}
|
||||
}
|
||||
return StrUtil.format("升级任务推送成功:{} 条,送失败:{} 条", successCount, failureCount);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.ota;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.ota.vo.task.record.IotOtaTaskRecordPageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO;
|
||||
import jakarta.validation.Valid;
|
||||
|
||||
@@ -65,6 +66,14 @@ public interface IotOtaTaskRecordService {
|
||||
*/
|
||||
List<IotOtaTaskRecordDO> getOtaTaskRecordListByDeviceIdAndStatus(Set<Long> deviceIds, Set<Integer> statuses);
|
||||
|
||||
/**
|
||||
* 根据记录状态,获取 OTA 升级记录列表
|
||||
*
|
||||
* @param status 升级记录状态
|
||||
* @return 升级记录列表
|
||||
*/
|
||||
List<IotOtaTaskRecordDO> getOtaRecordListByStatus(Integer status);
|
||||
|
||||
/**
|
||||
* 取消 OTA 升级记录
|
||||
*
|
||||
@@ -72,4 +81,14 @@ public interface IotOtaTaskRecordService {
|
||||
*/
|
||||
void cancelOtaTaskRecord(Long id);
|
||||
|
||||
/**
|
||||
* 推送 OTA 升级任务记录
|
||||
*
|
||||
* @param record 任务记录
|
||||
* @param fireware 固件信息
|
||||
* @param device 设备信息
|
||||
* @return 是否推送成功
|
||||
*/
|
||||
boolean pushOtaTaskRecord(IotOtaTaskRecordDO record, IotOtaFirmwareDO fireware, IotDeviceDO device);
|
||||
|
||||
}
|
||||
|
||||
@@ -2,12 +2,18 @@ package cn.iocoder.yudao.module.iot.service.ota;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.ota.vo.task.record.IotOtaTaskRecordPageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.mysql.ota.IotOtaTaskRecordMapper;
|
||||
import cn.iocoder.yudao.module.iot.enums.ota.IotOtaTaskRecordStatusEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.device.message.IotDeviceMessageService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -33,6 +39,10 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService {
|
||||
|
||||
@Resource
|
||||
private IotOtaTaskService otaTaskService;
|
||||
@Resource
|
||||
private IotDeviceMessageService deviceMessageService;
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Override
|
||||
public void createOtaTaskRecordList(List<IotDeviceDO> devices, Long firmwareId, Long taskId) {
|
||||
@@ -86,7 +96,7 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService {
|
||||
Collection<Long> ids = convertSet(records, IotOtaTaskRecordDO::getId);
|
||||
otaTaskRecordMapper.updateListByIdAndStatus(ids, IotOtaTaskRecordStatusEnum.IN_PROCESS_STATUSES,
|
||||
IotOtaTaskRecordDO.builder().status(IotOtaTaskRecordStatusEnum.CANCELED.getStatus())
|
||||
.description(IotOtaTaskRecordDO.DESCRIPTION_CANCEL_BY_RECORD).build());
|
||||
.description(IotOtaTaskRecordDO.DESCRIPTION_CANCEL_BY_TASK).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -94,6 +104,11 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService {
|
||||
return otaTaskRecordMapper.selectListByDeviceIdAndStatus(deviceIds, statuses);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IotOtaTaskRecordDO> getOtaRecordListByStatus(Integer status) {
|
||||
return otaTaskRecordMapper.selectListByStatus(status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelOtaTaskRecord(Long id) {
|
||||
// 1. 校验记录是否存在
|
||||
@@ -111,6 +126,30 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService {
|
||||
checkAndUpdateOtaTaskStatus(record.getTaskId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean pushOtaTaskRecord(IotOtaTaskRecordDO record, IotOtaFirmwareDO fireware, IotDeviceDO device) {
|
||||
try {
|
||||
// 1. 推送 OTA 任务记录
|
||||
IotDeviceMessage message = IotDeviceMessage.buildOtaUpgrade(
|
||||
fireware.getVersion(), fireware.getFileUrl(), fireware.getFileSize(),
|
||||
fireware.getFileDigestAlgorithm(), fireware.getFileDigestValue());
|
||||
deviceMessageService.sendDeviceMessage(message, device);
|
||||
|
||||
// 2. 更新 OTA 升级记录状态为进行中
|
||||
int updateCount = otaTaskRecordMapper.updateByIdAndStatus(
|
||||
record.getId(), IotOtaTaskRecordStatusEnum.PENDING.getStatus(),
|
||||
IotOtaTaskRecordDO.builder().status(IotOtaTaskRecordStatusEnum.PUSHED.getStatus())
|
||||
.description(StrUtil.format("已推送,设备消息编号({})", message.getId())).build());
|
||||
Assert.isTrue(updateCount == 1, "更新设备记录({})状态失败", record.getId());
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
log.error("[pushOtaTaskRecord][推送 OTA 任务记录({}) 失败]", record.getId(), ex);
|
||||
otaTaskRecordMapper.updateById(IotOtaTaskRecordDO.builder().id(record.getId())
|
||||
.description(StrUtil.format("推送失败,错误信息({})", ex.getMessage())).build());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private IotOtaTaskRecordDO validateUpgradeRecordExists(Long id) {
|
||||
IotOtaTaskRecordDO upgradeRecord = otaTaskRecordMapper.selectById(id);
|
||||
if (upgradeRecord == null) {
|
||||
|
||||
@@ -46,7 +46,7 @@ public interface IotOtaTaskService {
|
||||
|
||||
/**
|
||||
* 更新 OTA 任务状态为已结束
|
||||
*
|
||||
*
|
||||
* @param id 任务编号
|
||||
*/
|
||||
void updateOtaTaskStatusEnd(Long id);
|
||||
|
||||
@@ -42,6 +42,10 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
|
||||
|
||||
CONFIG_PUSH("thing.config.push", "配置推送", true),
|
||||
|
||||
// ========== OTA 固件 ==========
|
||||
|
||||
OTA_UPGRADE("thing.ota.upgrade", "OTA 推送固定信息", false),
|
||||
OTA_PROGRESS("thing.ota.progress", "OTA 上报升级进度", true),
|
||||
;
|
||||
|
||||
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod)
|
||||
|
||||
@@ -39,4 +39,8 @@ public enum IotDeviceStateEnum implements ArrayValuable<Integer> {
|
||||
return ONLINE.getState().equals(state);
|
||||
}
|
||||
|
||||
public static boolean isNotOnline(Integer state) {
|
||||
return !isOnline(state);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -140,4 +140,12 @@ public class IotDeviceMessage {
|
||||
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
|
||||
}
|
||||
|
||||
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
|
||||
String fileDigestAlgorithm, String fileDigestValue) {
|
||||
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
|
||||
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
|
||||
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user