This commit is contained in:
2026-03-04 03:54:54 +08:00
parent 8dc3501990
commit 16043dd52e
2 changed files with 93 additions and 50 deletions

View File

@@ -63,9 +63,9 @@ public interface MixTaskService {
void checkTaskStatusBatch();
/**
* 同步任务状态(从阿里云 ICE 查询)
* 同步任务状态(从阿里云 ICE 查询所有jobId综合判断
*/
void syncTaskStatus(Long taskId, String jobId);
void syncTaskStatus(Long taskId, List<String> jobIds);
/**
* 保存任务结果

View File

@@ -271,10 +271,8 @@ public class MixTaskServiceImpl implements MixTaskService {
try {
List<String> jobIds = task.getJobIdList();
if (jobIds != null && !jobIds.isEmpty()) {
// 每个任务可能有多个jobId取第一个进行检查
String jobId = jobIds.get(0);
// 使用任务的租户ID执行状态同步
TenantUtils.execute(task.getTenantId(), () -> syncTaskStatus(task.getId(), jobId));
// 使用任务的租户ID执行状态同步传递所有jobIds
TenantUtils.execute(task.getTenantId(), () -> syncTaskStatus(task.getId(), jobIds));
}
} catch (Exception e) {
log.error("[MixTask][单个任务检查失败] taskId={}", task.getId(), e);
@@ -284,8 +282,17 @@ public class MixTaskServiceImpl implements MixTaskService {
}
}
/**
* 同步任务状态检查所有jobId综合判断
*
* 状态判断逻辑:
* - 全部成功 → 任务成功
* - 全部失败 → 任务失败
* - 部分成功 → 任务成功只保留成功的视频URL
* - 任一运行中/待处理 → 任务继续运行
*/
@Override
public void syncTaskStatus(Long taskId, String jobId) {
public void syncTaskStatus(Long taskId, List<String> jobIds) {
try {
// 检查任务是否超时超过12小时则标记为失败
MixTaskDO task = mixTaskMapper.selectById(taskId);
@@ -301,60 +308,96 @@ public class MixTaskServiceImpl implements MixTaskService {
}
}
// 调用阿里云 ICE API 查询任务状态
String status = iceClient.getMediaProducingJobStatus(jobId);
// 检查所有jobId的状态
List<String> successJobIds = new ArrayList<>();
List<String> failedJobIds = new ArrayList<>();
List<String> runningJobIds = new ArrayList<>();
List<String> pendingJobIds = new ArrayList<>();
List<String> unknownJobIds = new ArrayList<>();
// 根据ICE状态更新任务
if ("Success".equalsIgnoreCase(status) || "success".equalsIgnoreCase(status)) {
// 任务成功完成需要获取实际的输出URL
log.info("[MixTask][ICE任务执行成功] taskId={}, jobId={}", taskId, jobId);
// 重新生成签名URL并更新任务
updateTaskSuccess(taskId, jobId);
} else if ("Failed".equalsIgnoreCase(status) || "failed".equalsIgnoreCase(status) || "Failure".equalsIgnoreCase(status)) {
// 任务失败 - 获取详细错误信息
String errorMsg = "ICE任务执行失败";
for (String jobId : jobIds) {
try {
// 尝试获取更详细的失败信息如果ICE API支持
errorMsg = "ICE任务执行失败状态: " + status;
} catch (Exception ex) {
log.warn("[MixTask][获取详细失败信息失败] taskId={}", taskId, ex);
}
log.error("[MixTask][ICE任务执行失败] taskId={}, jobId={}, iceStatus={}, errorMsg={}",
taskId, jobId, status, errorMsg);
updateTaskError(taskId, errorMsg);
} else if ("Running".equalsIgnoreCase(status) || "running".equalsIgnoreCase(status) || "Processing".equalsIgnoreCase(status)) {
// 任务仍在运行,逐步更新进度
// 1. 获取当前进度
MixTaskDO currentTask = mixTaskMapper.selectById(taskId);
if (currentTask != null) {
int currentProgress = currentTask.getProgress() != null ? currentTask.getProgress() : 0;
// 2. 根据当前进度逐步提升但不超过95%
int newProgress;
if (currentProgress < 50) {
newProgress = 50; // 首次看到Running时更新到50%
} else if (currentProgress < 80) {
newProgress = 80; // 第二次更新到80%
} else if (currentProgress < 95) {
newProgress = 95; // 最后更新到95%
String status = iceClient.getMediaProducingJobStatus(jobId);
if ("Success".equalsIgnoreCase(status)) {
successJobIds.add(jobId);
} else if ("Failed".equalsIgnoreCase(status) || "Failure".equalsIgnoreCase(status)) {
failedJobIds.add(jobId);
} else if ("Running".equalsIgnoreCase(status) || "Processing".equalsIgnoreCase(status)) {
runningJobIds.add(jobId);
} else if ("Pending".equalsIgnoreCase(status)) {
pendingJobIds.add(jobId);
} else {
newProgress = currentProgress; // 已经很高了,保持不变
unknownJobIds.add(jobId);
}
} catch (Exception e) {
log.error("[MixTask][查询jobId状态失败] taskId={}, jobId={}", taskId, jobId, e);
failedJobIds.add(jobId);
}
}
if (newProgress != currentProgress) {
updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, newProgress);
log.info("[MixTask][进度更新] taskId={}, from={}%, to={}%", taskId, currentProgress, newProgress);
int total = jobIds.size();
log.info("[MixTask][状态汇总] taskId={}, total={}, success={}, failed={}, running={}, pending={}, unknown={}",
taskId, total, successJobIds.size(), failedJobIds.size(), runningJobIds.size(), pendingJobIds.size(), unknownJobIds.size());
// 综合判断任务状态
if (!runningJobIds.isEmpty() || !pendingJobIds.isEmpty()) {
// 任一运行中或待处理 → 更新进度,继续等待
int currentProgress = task.getProgress() != null ? task.getProgress() : 0;
int newProgress;
if (currentProgress < 50) {
newProgress = 50;
} else if (currentProgress < 80) {
newProgress = 80;
} else if (currentProgress < 95) {
newProgress = 95;
} else {
newProgress = currentProgress;
}
if (newProgress != currentProgress) {
updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, newProgress);
log.info("[MixTask][进度更新] taskId={}, from={}%, to={}%, 完成数={}/{}",
taskId, currentProgress, newProgress, successJobIds.size(), total);
}
} else if (!successJobIds.isEmpty()) {
// 至少有一个成功 → 任务成功过滤掉失败的URL
List<String> outputUrls = task.getOutputUrlList();
List<String> successUrls = new ArrayList<>();
// 根据成功的jobId索引获取对应的URL
for (String successJobId : successJobIds) {
int index = jobIds.indexOf(successJobId);
if (index >= 0 && index < outputUrls.size()) {
successUrls.add(outputUrls.get(index));
}
}
} else if ("Pending".equalsIgnoreCase(status) || "pending".equalsIgnoreCase(status)) {
// 任务等待中更新进度为30%
updateTaskStatus(taskId, MixTaskConstants.STATUS_RUNNING, 30);
if (successUrls.isEmpty()) {
successUrls = outputUrls; // 兜底使用原始URL列表
}
// 更新任务为成功状态
MixTaskDO updateTask = new MixTaskDO();
updateTask.setId(taskId);
updateTask.setStatus(MixTaskConstants.STATUS_SUCCESS);
updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED);
updateTask.setOutputUrlList(successUrls);
updateTask.setFinishTime(LocalDateTime.now());
if (!failedJobIds.isEmpty()) {
updateTask.setErrorMsg("部分视频生成失败,成功 " + successJobIds.size() + "/" + total + "");
}
mixTaskMapper.updateById(updateTask);
log.info("[MixTask][任务完成] taskId={}, 成功数={}/{}, 失败数={}",
taskId, successJobIds.size(), total, failedJobIds.size());
} else {
// 未知状态,记录日志但不更新
log.warn("[MixTask][未知ICE状态] taskId={}, jobId={}, iceStatus={}", taskId, jobId, status);
// 全部失败 → 任务失败
String errorMsg = "全部ICE任务执行失败" + total + "";
log.error("[MixTask][ICE任务全部失败] taskId={}, jobIds={}", taskId, jobIds);
updateTaskError(taskId, errorMsg);
}
} catch (Exception e) {
log.error("[MixTask][状态同步异常] taskId={}, jobId={}", taskId, jobId, e);
log.error("[MixTask][状态同步异常] taskId={}, jobIds={}", taskId, jobIds, e);
updateTaskError(taskId, "查询任务状态失败: " + e.getMessage());
}
}