diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java
index 50d83f4401..74030dbeaf 100644
--- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java
+++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java
@@ -50,6 +50,13 @@ public class TikAsyncConfig {
new ThreadPoolExecutor.DiscardOldestPolicy());
}
+ /** ICE 混剪异步提交专用线程池 */
+ @Bean("iceSubmitExecutor")
+ public Executor iceSubmitExecutor() {
+ return createExecutor("ice-submit-", 4, 10, 200, 60,
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
/** 对标分析任务专用线程池 */
@Bean("benchmarkAsyncExecutor")
public Executor benchmarkAsyncExecutor() {
diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/job/MixTaskStatusSyncJob.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/job/MixTaskStatusSyncJob.java
index 171c8dacd8..df9f10ed6b 100644
--- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/job/MixTaskStatusSyncJob.java
+++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/job/MixTaskStatusSyncJob.java
@@ -8,7 +8,11 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
- * 混剪任务状态同步定时任务
+ * 混剪任务定时调度
+ *
+ * 双路径架构:
+ * 1. createMixTask 即时异步提交(低延迟,最佳尝试)
+ * 2. 本定时任务兜底恢复(高可靠,DB 驱动,JVM 重启不丢任务)
*
* @author 芋道源码
*/
@@ -20,7 +24,23 @@ public class MixTaskStatusSyncJob {
private final MixTaskService mixTaskService;
/**
- * 每30秒检查一次任务状态
+ * 每30秒:恢复僵尸 pending 任务(job_ids 为空的待提交任务)
+ *
+ * 这是 DB 驱动的兜底恢复路径。createMixTask 中的 CompletableFuture
+ * 可能因 JVM 重启、线程池拒绝等原因丢失任务,此调度保证任务不丢。
+ */
+ @Scheduled(fixedDelay = 30_000, initialDelay = 15_000)
+ public void recoverPendingSubmissions() {
+ log.debug("开始恢复僵尸 pending 任务");
+ try {
+ mixTaskService.processPendingSubmissions();
+ } catch (Exception e) {
+ log.error("恢复僵尸 pending 任务失败", e);
+ }
+ }
+
+ /**
+ * 每分钟:同步运行中任务的状态(从 ICE 查询 jobId 进度)
*/
@Scheduled(cron = MixTaskConstants.CRON_CHECK_STATUS)
public void syncTaskStatus() {
diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java
index 5306676275..1b785868a1 100644
--- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java
+++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskService.java
@@ -72,6 +72,14 @@ public interface MixTaskService {
*/
void saveTaskResult(Long taskId, List outputUrls);
+ /**
+ * 处理待提交任务(定时兜底恢复)
+ *
+ * 扫描 pending 状态且 job_ids 为空的僵尸任务,重新提交到 ICE。
+ * 只处理创建超过2分钟的任务,避免与即时异步提交冲突。
+ */
+ void processPendingSubmissions();
+
/**
* 生成签名URL
*
diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java
index 2263f43732..69d2fe08c1 100644
--- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java
+++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/mix/service/MixTaskServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -47,6 +48,9 @@ public class MixTaskServiceImpl implements MixTaskService {
@Resource
private FileApi fileApi;
+ @Resource(name = "iceSubmitExecutor")
+ private Executor iceSubmitExecutor;
+
@Override
@Transactional(rollbackFor = Exception.class)
public Long createMixTask(MixTaskSaveReqVO createReqVO, Long userId) {
@@ -68,7 +72,7 @@ public class MixTaskServiceImpl implements MixTaskService {
// 2. 保存到数据库
mixTaskMapper.insert(task);
- // 3. 异步提交到阿里云 ICE(使用 TTL 自动传递上下文)
+ // 3. 异步提交到阿里云 ICE(使用专用线程池 + TTL 自动传递上下文)
CompletableFuture.runAsync(TtlRunnable.get(() -> {
try {
submitToICE(task.getId(), createReqVO, userId);
@@ -76,7 +80,7 @@ public class MixTaskServiceImpl implements MixTaskService {
log.error("[MixTask][提交ICE失败] taskId={}", task.getId(), e);
updateTaskError(task.getId(), "提交任务失败: " + e.getMessage());
}
- }));
+ }), iceSubmitExecutor);
return task.getId();
}
@@ -178,25 +182,10 @@ public class MixTaskServiceImpl implements MixTaskService {
updateTask.setOutputUrlList(null);
mixTaskMapper.updateById(updateTask);
- // 3. 重新提交到ICE(使用 TTL 自动传递上下文)
+ // 3. 重新提交到ICE(使用专用线程池 + TTL 自动传递上下文)
CompletableFuture.runAsync(TtlRunnable.get(() -> {
try {
- // 从 materialsJson 重建请求对象
- List materials = null;
- if (StrUtil.isNotEmpty(existTask.getMaterialsJson())) {
- materials = JsonUtils.parseArray(existTask.getMaterialsJson(), MixTaskSaveReqVO.MaterialItem.class);
- } else if (existTask.getVideoUrlList() != null && !existTask.getVideoUrlList().isEmpty()) {
- // 兼容旧版本:从 videoUrls 重建(默认3秒时长)
- materials = existTask.getVideoUrlList().stream()
- .map(url -> {
- MixTaskSaveReqVO.MaterialItem item = new MixTaskSaveReqVO.MaterialItem();
- item.setFileUrl(url);
- item.setDuration(3); // 默认3秒
- return item;
- })
- .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
- }
-
+ List materials = rebuildMaterialsFromTask(existTask);
if (materials == null || materials.isEmpty()) {
throw new IllegalArgumentException("无法重建素材列表");
}
@@ -210,7 +199,7 @@ public class MixTaskServiceImpl implements MixTaskService {
log.error("[MixTask][重新提交失败] taskId={}", id, e);
updateTaskError(id, "重新提交失败: " + e.getMessage());
}
- }));
+ }), iceSubmitExecutor);
}
@Override
@@ -282,6 +271,84 @@ public class MixTaskServiceImpl implements MixTaskService {
}
}
+ /**
+ * 处理待提交的僵尸任务(定时兜底恢复)
+ *
+ * 设计意图:
+ * - createMixTask 中的 CompletableFuture 是"即时路径",追求低延迟
+ * - 本方法是"兜底路径",处理即时路径丢失的任务(JVM 重启、线程池拒绝等)
+ * - 只处理创建超过2分钟的任务,避免与即时路径的异步提交冲突
+ *
+ * 执行策略:
+ * - 扫描最近6小时内、pending 状态、job_ids 为空的僵尸任务
+ * - 每次至多处理 20 个,避免单次执行时间过长
+ * - 使用租户上下文重建请求并同步提交到 ICE
+ * - 单个任务失败不影响其他任务
+ */
+ @Override
+ public void processPendingSubmissions() {
+ // 安全窗口:只处理创建超过2分钟的任务,避免与即时异步提交冲突
+ LocalDateTime cutoffTime = LocalDateTime.now().minusMinutes(2);
+ LocalDateTime startTime = LocalDateTime.now().minusHours(MixTaskConstants.CHECK_HOURS_LIMIT);
+ int successCount = 0;
+ int failCount = 0;
+
+ // 查询僵尸任务(pending 且 job_ids 为空,在时间窗口内)
+ List zombieTasks = TenantUtils.executeIgnore(() ->
+ mixTaskMapper.selectList(
+ new cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX()
+ .eq(MixTaskDO::getStatus, MixTaskConstants.STATUS_PENDING)
+ .isNull(MixTaskDO::getJobIds)
+ .ge(MixTaskDO::getCreateTime, startTime)
+ .le(MixTaskDO::getCreateTime, cutoffTime)
+ .orderByAsc(MixTaskDO::getCreateTime)
+ .last("LIMIT 20")
+ )
+ );
+
+ if (zombieTasks.isEmpty()) {
+ return;
+ }
+
+ log.info("[MixTask][僵尸任务恢复] 发现 {} 个待提交的僵尸任务", zombieTasks.size());
+
+ for (MixTaskDO task : zombieTasks) {
+ try {
+ Long taskId = task.getId();
+ Long userId = task.getUserId();
+ Long tenantId = task.getTenantId();
+
+ List materials = rebuildMaterialsFromTask(task);
+ if (materials == null || materials.isEmpty()) {
+ log.warn("[MixTask][僵尸任务跳过] taskId={}, 无法重建素材列表", taskId);
+ updateTaskError(taskId, "素材数据缺失,无法恢复提交");
+ failCount++;
+ continue;
+ }
+
+ MixTaskSaveReqVO saveReqVO = new MixTaskSaveReqVO();
+ saveReqVO.setTitle(task.getTitle());
+ saveReqVO.setMaterials(materials);
+ saveReqVO.setProduceCount(task.getProduceCount());
+
+ // 使用任务原有的租户上下文提交
+ TenantUtils.execute(tenantId, () -> submitToICE(taskId, saveReqVO, userId));
+
+ successCount++;
+ log.info("[MixTask][僵尸任务恢复成功] taskId={}, tenantId={}, materialCount={}",
+ taskId, tenantId, materials.size());
+
+ } catch (Exception e) {
+ log.error("[MixTask][僵尸任务恢复失败] taskId={}", task.getId(), e);
+ updateTaskError(task.getId(), "恢复提交失败: " + e.getMessage());
+ failCount++;
+ }
+ }
+
+ log.info("[MixTask][僵尸任务恢复完成] 共处理 {} 个, 成功 {} 个, 失败 {} 个",
+ successCount + failCount, successCount, failCount);
+ }
+
/**
* 同步任务状态(检查所有jobId,综合判断)
*
@@ -560,16 +627,25 @@ public class MixTaskServiceImpl implements MixTaskService {
}
/**
- * 更新任务错误信息
+ * 更新任务错误信息(自带异常保护,永不对外抛异常)
+ *
+ * 这是错误处理链的最后一道防线。即使 DB 更新失败,也至少记录日志,
+ * 避免因 updateTaskError 自身失败导致 error_msg 永久为 null。
*/
private void updateTaskError(Long taskId, String errorMsg) {
- MixTaskDO updateTask = new MixTaskDO();
- updateTask.setId(taskId);
- updateTask.setStatus(MixTaskConstants.STATUS_FAILED);
- updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED);
- updateTask.setErrorMsg(errorMsg);
- updateTask.setFinishTime(java.time.LocalDateTime.now());
- mixTaskMapper.updateById(updateTask);
+ try {
+ MixTaskDO updateTask = new MixTaskDO();
+ updateTask.setId(taskId);
+ updateTask.setStatus(MixTaskConstants.STATUS_FAILED);
+ updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED);
+ updateTask.setErrorMsg(errorMsg);
+ updateTask.setFinishTime(LocalDateTime.now());
+ mixTaskMapper.updateById(updateTask);
+ log.info("[MixTask][错误已记录] taskId={}, errorMsg={}", taskId, errorMsg);
+ } catch (Exception e) {
+ // 最后兜底:至少确保日志中有错误信息,方便运维排查
+ log.error("[MixTask][严重] 无法更新任务错误信息到数据库!taskId={}, errorMsg={}", taskId, errorMsg, e);
+ }
}
/**
@@ -634,6 +710,31 @@ public class MixTaskServiceImpl implements MixTaskService {
});
}
+ /**
+ * 从任务记录重建素材列表(供 retryTask 和 processPendingSubmissions 共用)
+ *
+ * 优先从 materialsJson 解析,降级到 videoUrls(兼容旧版本,默认3秒时长)
+ *
+ * @param task 任务记录
+ * @return 素材列表,无法重建时返回 null
+ */
+ private List rebuildMaterialsFromTask(MixTaskDO task) {
+ if (StrUtil.isNotEmpty(task.getMaterialsJson())) {
+ return JsonUtils.parseArray(task.getMaterialsJson(), MixTaskSaveReqVO.MaterialItem.class);
+ }
+ if (task.getVideoUrlList() != null && !task.getVideoUrlList().isEmpty()) {
+ return task.getVideoUrlList().stream()
+ .map(url -> {
+ MixTaskSaveReqVO.MaterialItem item = new MixTaskSaveReqVO.MaterialItem();
+ item.setFileUrl(url);
+ item.setDuration(3);
+ return item;
+ })
+ .collect(Collectors.toList());
+ }
+ return null;
+ }
+
/**
* 校验混剪任务时长
*/