diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java index 914775cbf4..5306676275 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java @@ -63,9 +63,9 @@ public interface MixTaskService { void checkTaskStatusBatch(); /** - * 同步任务状态(从阿里云 ICE 查询) + * 同步任务状态(从阿里云 ICE 查询所有jobId,综合判断) */ - void syncTaskStatus(Long taskId, String jobId); + void syncTaskStatus(Long taskId, List jobIds); /** * 保存任务结果 diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java index 87f164e4c1..a63a095ff1 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java @@ -271,10 +271,8 @@ public class MixTaskServiceImpl implements MixTaskService { try { List jobIds = task.getJobIdList(); if (jobIds != null && !jobIds.isEmpty()) { - // 每个任务可能有多个jobId,取第一个进行检查 - String jobId = jobIds.get(0); - // 使用任务的租户ID执行状态同步 - TenantUtils.execute(task.getTenantId(), () -> syncTaskStatus(task.getId(), jobId)); + // 使用任务的租户ID执行状态同步(传递所有jobIds) + TenantUtils.execute(task.getTenantId(), () -> syncTaskStatus(task.getId(), jobIds)); } } catch (Exception e) { log.error("[MixTask][单个任务检查失败] taskId={}", task.getId(), e); @@ -284,8 +282,17 @@ public class MixTaskServiceImpl implements MixTaskService { } } + /** + * 同步任务状态(检查所有jobId,综合判断) + * + * 状态判断逻辑: + * - 全部成功 → 任务成功 + * - 全部失败 → 任务失败 + * - 部分成功 → 任务成功(只保留成功的视频URL) + * - 任一运行中/待处理 → 任务继续运行 + */ @Override - public void syncTaskStatus(Long taskId, String jobId) { + public void syncTaskStatus(Long taskId, List jobIds) { try { // 检查任务是否超时(超过12小时则标记为失败) MixTaskDO task = mixTaskMapper.selectById(taskId); @@ -301,60 +308,96 @@ public class MixTaskServiceImpl implements MixTaskService { } } - // 调用阿里云 ICE API 查询任务状态 - String status = iceClient.getMediaProducingJobStatus(jobId); + // 检查所有jobId的状态 + List successJobIds = new ArrayList<>(); + List failedJobIds = new ArrayList<>(); + List runningJobIds = new ArrayList<>(); + List pendingJobIds = new ArrayList<>(); + List unknownJobIds = new ArrayList<>(); - // 根据ICE状态更新任务 - if ("Success".equalsIgnoreCase(status) || "success".equalsIgnoreCase(status)) { - // 任务成功完成,需要获取实际的输出URL - log.info("[MixTask][ICE任务执行成功] taskId={}, jobId={}", taskId, jobId); - // 重新生成签名URL并更新任务 - updateTaskSuccess(taskId, jobId); - } else if ("Failed".equalsIgnoreCase(status) || "failed".equalsIgnoreCase(status) || "Failure".equalsIgnoreCase(status)) { - // 任务失败 - 获取详细错误信息 - String errorMsg = "ICE任务执行失败"; + for (String jobId : jobIds) { try { - // 尝试获取更详细的失败信息(如果ICE API支持) - errorMsg = "ICE任务执行失败,状态: " + status; - } catch (Exception ex) { - log.warn("[MixTask][获取详细失败信息失败] taskId={}", taskId, ex); - } - log.error("[MixTask][ICE任务执行失败] taskId={}, jobId={}, iceStatus={}, errorMsg={}", - taskId, jobId, status, errorMsg); - updateTaskError(taskId, errorMsg); - } else if ("Running".equalsIgnoreCase(status) || "running".equalsIgnoreCase(status) || "Processing".equalsIgnoreCase(status)) { - // 任务仍在运行,逐步更新进度 - // 1. 获取当前进度 - MixTaskDO currentTask = mixTaskMapper.selectById(taskId); - if (currentTask != null) { - int currentProgress = currentTask.getProgress() != null ? currentTask.getProgress() : 0; - // 2. 根据当前进度逐步提升,但不超过95% - int newProgress; - if (currentProgress < 50) { - newProgress = 50; // 首次看到Running时更新到50% - } else if (currentProgress < 80) { - newProgress = 80; // 第二次更新到80% - } else if (currentProgress < 95) { - newProgress = 95; // 最后更新到95% + String status = iceClient.getMediaProducingJobStatus(jobId); + if ("Success".equalsIgnoreCase(status)) { + successJobIds.add(jobId); + } else if ("Failed".equalsIgnoreCase(status) || "Failure".equalsIgnoreCase(status)) { + failedJobIds.add(jobId); + } else if ("Running".equalsIgnoreCase(status) || "Processing".equalsIgnoreCase(status)) { + runningJobIds.add(jobId); + } else if ("Pending".equalsIgnoreCase(status)) { + pendingJobIds.add(jobId); } else { - newProgress = currentProgress; // 已经很高了,保持不变 + unknownJobIds.add(jobId); } + } catch (Exception e) { + log.error("[MixTask][查询jobId状态失败] taskId={}, jobId={}", taskId, jobId, e); + failedJobIds.add(jobId); + } + } - if (newProgress != currentProgress) { - updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, newProgress); - log.info("[MixTask][进度更新] taskId={}, from={}%, to={}%", taskId, currentProgress, newProgress); + int total = jobIds.size(); + log.info("[MixTask][状态汇总] taskId={}, total={}, success={}, failed={}, running={}, pending={}, unknown={}", + taskId, total, successJobIds.size(), failedJobIds.size(), runningJobIds.size(), pendingJobIds.size(), unknownJobIds.size()); + + // 综合判断任务状态 + if (!runningJobIds.isEmpty() || !pendingJobIds.isEmpty()) { + // 任一运行中或待处理 → 更新进度,继续等待 + int currentProgress = task.getProgress() != null ? task.getProgress() : 0; + int newProgress; + if (currentProgress < 50) { + newProgress = 50; + } else if (currentProgress < 80) { + newProgress = 80; + } else if (currentProgress < 95) { + newProgress = 95; + } else { + newProgress = currentProgress; + } + if (newProgress != currentProgress) { + updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, newProgress); + log.info("[MixTask][进度更新] taskId={}, from={}%, to={}%, 完成数={}/{}", + taskId, currentProgress, newProgress, successJobIds.size(), total); + } + } else if (!successJobIds.isEmpty()) { + // 至少有一个成功 → 任务成功(过滤掉失败的URL) + List outputUrls = task.getOutputUrlList(); + List successUrls = new ArrayList<>(); + + // 根据成功的jobId索引获取对应的URL + for (String successJobId : successJobIds) { + int index = jobIds.indexOf(successJobId); + if (index >= 0 && index < outputUrls.size()) { + successUrls.add(outputUrls.get(index)); } } - } else if ("Pending".equalsIgnoreCase(status) || "pending".equalsIgnoreCase(status)) { - // 任务等待中,更新进度为30% - updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, 30); + + if (successUrls.isEmpty()) { + successUrls = outputUrls; // 兜底:使用原始URL列表 + } + + // 更新任务为成功状态 + MixTaskDO updateTask = new MixTaskDO(); + updateTask.setId(taskId); + updateTask.setStatus(MixTaskConstants.STATUS_SUCCESS); + updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED); + updateTask.setOutputUrlList(successUrls); + updateTask.setFinishTime(LocalDateTime.now()); + if (!failedJobIds.isEmpty()) { + updateTask.setErrorMsg("部分视频生成失败,成功 " + successJobIds.size() + "/" + total + " 个"); + } + mixTaskMapper.updateById(updateTask); + + log.info("[MixTask][任务完成] taskId={}, 成功数={}/{}, 失败数={}", + taskId, successJobIds.size(), total, failedJobIds.size()); } else { - // 未知状态,记录日志但不更新 - log.warn("[MixTask][未知ICE状态] taskId={}, jobId={}, iceStatus={}", taskId, jobId, status); + // 全部失败 → 任务失败 + String errorMsg = "全部ICE任务执行失败,共 " + total + " 个"; + log.error("[MixTask][ICE任务全部失败] taskId={}, jobIds={}", taskId, jobIds); + updateTaskError(taskId, errorMsg); } } catch (Exception e) { - log.error("[MixTask][状态同步异常] taskId={}, jobId={}", taskId, jobId, e); + log.error("[MixTask][状态同步异常] taskId={}, jobIds={}", taskId, jobIds, e); updateTaskError(taskId, "查询任务状态失败: " + e.getMessage()); } }