diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/ota/IotOtaTaskRecordMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/ota/IotOtaTaskRecordMapper.java index 2245b31a17..1c919da2e9 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/ota/IotOtaTaskRecordMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/ota/IotOtaTaskRecordMapper.java @@ -40,6 +40,13 @@ public interface IotOtaTaskRecordMapper extends BaseMapperX .in(IotOtaTaskRecordDO::getStatus, statuses)); } + default int updateByIdAndStatus(Long id, Integer status, + IotOtaTaskRecordDO updateObj) { + return update(updateObj, new LambdaUpdateWrapper() + .eq(IotOtaTaskRecordDO::getId, id) + .eq(IotOtaTaskRecordDO::getStatus, status)); + } + default int updateByIdAndStatus(Long id, Collection whereStatuses, IotOtaTaskRecordDO updateObj) { return update(updateObj, new LambdaUpdateWrapper() @@ -60,4 +67,8 @@ public interface IotOtaTaskRecordMapper extends BaseMapperX .inIfPresent(IotOtaTaskRecordDO::getStatus, statuses)); } + default List selectListByStatus(Integer status) { + return selectList(IotOtaTaskRecordDO::getStatus, status); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/ota/IotOtaUpgradeJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/ota/IotOtaUpgradeJob.java new file mode 100644 index 0000000000..46a6fa22a3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/ota/IotOtaUpgradeJob.java @@ -0,0 +1,81 @@ +package cn.iocoder.yudao.module.iot.job.ota; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler; +import cn.iocoder.yudao.framework.tenant.core.job.TenantJob; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO; +import cn.iocoder.yudao.module.iot.enums.ota.IotOtaTaskRecordStatusEnum; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.ota.IotOtaFirmwareService; +import cn.iocoder.yudao.module.iot.service.ota.IotOtaTaskRecordService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * IoT OTA 升级推送 Job:查询待推送的 OTA 升级记录,并推送给设备 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotOtaUpgradeJob implements JobHandler { + + @Resource + private IotOtaTaskRecordService otaTaskRecordService; + @Resource + private IotOtaFirmwareService otaFirmwareService; + @Resource + private IotDeviceService deviceService; + + @Override + @TenantJob + public String execute(String param) throws Exception { + // 1. 查询待推送的 OTA 升级记录 + List records = otaTaskRecordService.getOtaRecordListByStatus( + IotOtaTaskRecordStatusEnum.PENDING.getStatus()); + if (CollUtil.isEmpty(records)) { + return null; + } + + // 2. 遍历推送记录 + int successCount = 0; + int failureCount = 0; + Map otaFirmwares = new HashMap<>(); + for (IotOtaTaskRecordDO record : records) { + try { + // 2.1 设备如果不在线,直接跳过 + IotDeviceDO device = deviceService.getDeviceFromCache(record.getDeviceId()); + if (device == null || IotDeviceStateEnum.isNotOnline(device.getState())) { + continue; + } + // 2.2 获取 OTA 固件信息 + IotOtaFirmwareDO fireware = otaFirmwares.get(record.getFirmwareId()); + if (fireware == null) { + fireware = otaFirmwareService.getOtaFirmware(record.getFirmwareId()); + otaFirmwares.put(record.getFirmwareId(), fireware); + } + // 2.3 推送 OTA 升级任务 + boolean result = otaTaskRecordService.pushOtaTaskRecord(record, fireware, device); + if (result) { + successCount++; + } else { + failureCount++; + } + } catch (Exception e) { + failureCount++; + log.error("[execute][推送 OTA 升级任务({})发生异常]", record.getId(), e); + } + } + return StrUtil.format("升级任务推送成功:{} 条,送失败:{} 条", successCount, failureCount); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordService.java index df5a873f7d..3601baf399 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordService.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.service.ota; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.ota.vo.task.record.IotOtaTaskRecordPageReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO; import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO; import jakarta.validation.Valid; @@ -65,6 +66,14 @@ public interface IotOtaTaskRecordService { */ List getOtaTaskRecordListByDeviceIdAndStatus(Set deviceIds, Set statuses); + /** + * 根据记录状态,获取 OTA 升级记录列表 + * + * @param status 升级记录状态 + * @return 升级记录列表 + */ + List getOtaRecordListByStatus(Integer status); + /** * 取消 OTA 升级记录 * @@ -72,4 +81,14 @@ public interface IotOtaTaskRecordService { */ void cancelOtaTaskRecord(Long id); + /** + * 推送 OTA 升级任务记录 + * + * @param record 任务记录 + * @param fireware 固件信息 + * @param device 设备信息 + * @return 是否推送成功 + */ + boolean pushOtaTaskRecord(IotOtaTaskRecordDO record, IotOtaFirmwareDO fireware, IotDeviceDO device); + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordServiceImpl.java index 27964a0395..004d435487 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskRecordServiceImpl.java @@ -2,12 +2,18 @@ package cn.iocoder.yudao.module.iot.service.ota; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.ota.vo.task.record.IotOtaTaskRecordPageReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO; import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaTaskRecordDO; import cn.iocoder.yudao.module.iot.dal.mysql.ota.IotOtaTaskRecordMapper; import cn.iocoder.yudao.module.iot.enums.ota.IotOtaTaskRecordStatusEnum; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.message.IotDeviceMessageService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -33,6 +39,10 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService { @Resource private IotOtaTaskService otaTaskService; + @Resource + private IotDeviceMessageService deviceMessageService; + @Resource + private IotDeviceService deviceService; @Override public void createOtaTaskRecordList(List devices, Long firmwareId, Long taskId) { @@ -86,7 +96,7 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService { Collection ids = convertSet(records, IotOtaTaskRecordDO::getId); otaTaskRecordMapper.updateListByIdAndStatus(ids, IotOtaTaskRecordStatusEnum.IN_PROCESS_STATUSES, IotOtaTaskRecordDO.builder().status(IotOtaTaskRecordStatusEnum.CANCELED.getStatus()) - .description(IotOtaTaskRecordDO.DESCRIPTION_CANCEL_BY_RECORD).build()); + .description(IotOtaTaskRecordDO.DESCRIPTION_CANCEL_BY_TASK).build()); } @Override @@ -94,6 +104,11 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService { return otaTaskRecordMapper.selectListByDeviceIdAndStatus(deviceIds, statuses); } + @Override + public List getOtaRecordListByStatus(Integer status) { + return otaTaskRecordMapper.selectListByStatus(status); + } + @Override public void cancelOtaTaskRecord(Long id) { // 1. 校验记录是否存在 @@ -111,6 +126,30 @@ public class IotOtaTaskRecordServiceImpl implements IotOtaTaskRecordService { checkAndUpdateOtaTaskStatus(record.getTaskId()); } + @Override + public boolean pushOtaTaskRecord(IotOtaTaskRecordDO record, IotOtaFirmwareDO fireware, IotDeviceDO device) { + try { + // 1. 推送 OTA 任务记录 + IotDeviceMessage message = IotDeviceMessage.buildOtaUpgrade( + fireware.getVersion(), fireware.getFileUrl(), fireware.getFileSize(), + fireware.getFileDigestAlgorithm(), fireware.getFileDigestValue()); + deviceMessageService.sendDeviceMessage(message, device); + + // 2. 更新 OTA 升级记录状态为进行中 + int updateCount = otaTaskRecordMapper.updateByIdAndStatus( + record.getId(), IotOtaTaskRecordStatusEnum.PENDING.getStatus(), + IotOtaTaskRecordDO.builder().status(IotOtaTaskRecordStatusEnum.PUSHED.getStatus()) + .description(StrUtil.format("已推送,设备消息编号({})", message.getId())).build()); + Assert.isTrue(updateCount == 1, "更新设备记录({})状态失败", record.getId()); + return true; + } catch (Exception ex) { + log.error("[pushOtaTaskRecord][推送 OTA 任务记录({}) 失败]", record.getId(), ex); + otaTaskRecordMapper.updateById(IotOtaTaskRecordDO.builder().id(record.getId()) + .description(StrUtil.format("推送失败,错误信息({})", ex.getMessage())).build()); + return false; + } + } + private IotOtaTaskRecordDO validateUpgradeRecordExists(Long id) { IotOtaTaskRecordDO upgradeRecord = otaTaskRecordMapper.selectById(id); if (upgradeRecord == null) { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskService.java index 7c3ce38e95..ead91e2874 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/ota/IotOtaTaskService.java @@ -46,7 +46,7 @@ public interface IotOtaTaskService { /** * 更新 OTA 任务状态为已结束 - * + * * @param id 任务编号 */ void updateOtaTaskStatusEnd(Long id); diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java index fddf155a08..dc0e2524a5 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceMessageMethodEnum.java @@ -42,6 +42,10 @@ public enum IotDeviceMessageMethodEnum implements ArrayValuable { CONFIG_PUSH("thing.config.push", "配置推送", true), + // ========== OTA 固件 ========== + + OTA_UPGRADE("thing.ota.upgrade", "OTA 推送固定信息", false), + OTA_PROGRESS("thing.ota.progress", "OTA 上报升级进度", true), ; public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod) diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceStateEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceStateEnum.java index 28cc33f7be..d0ff8357e7 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceStateEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotDeviceStateEnum.java @@ -39,4 +39,8 @@ public enum IotDeviceStateEnum implements ArrayValuable { return ONLINE.getState().equals(state); } + public static boolean isNotOnline(Integer state) { + return !isOnline(state); + } + } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java index 01af310081..6821c0d160 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java @@ -140,4 +140,12 @@ public class IotDeviceMessage { MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState())); } + public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize, + String fileDigestAlgorithm, String fileDigestValue) { + return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder() + .put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize) + .put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue) + .build()); + } + }