feat(mix): 添加混剪任务定时恢复与专用线程池
Some checks failed
Build and Deploy / deploy (push) Has been cancelled

新增 ICE 异步提交专用线程池,支持定时兜底恢复僵尸 pending 任务,增强任务提交的可靠性。

- 创建 `iceSubmitExecutor` 线程池(核心4/最大10/有界队列200/CallerRunsPolicy)
- 新增 `processPendingSubmissions` 方法,扫描并恢复6小时内、pending且job_ids为空的僵尸任务
- 新增定时任务 `recoverPendingSubmissions`,每30秒执行一次
- 将异步提交改为使用专用线程池,并增加异常保护
- 优化 `updateTaskError` 方法,增加日志兜底
- 修复前端生成数量选择器宽度不足的问题
This commit is contained in:
2026-06-03 21:55:17 +08:00
parent 8538b3cdb4
commit 248deeea0d
5 changed files with 167 additions and 31 deletions

View File

@@ -45,7 +45,7 @@
<div class="param-item"> <div class="param-item">
<span class="param-label">生成</span> <span class="param-label">生成</span>
<Select v-model="formData.produceCount" @update:model-value="saveProduceCount"> <Select v-model="formData.produceCount" @update:model-value="saveProduceCount">
<SelectTrigger class="w-16 h-7"> <SelectTrigger class="w-24 h-7">
<SelectValue placeholder="选择" /> <SelectValue placeholder="选择" />
</SelectTrigger> </SelectTrigger>
<SelectContent> <SelectContent>

View File

@@ -50,6 +50,13 @@ public class TikAsyncConfig {
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
} }
/** ICE 混剪异步提交专用线程池 */
@Bean("iceSubmitExecutor")
public Executor iceSubmitExecutor() {
return createExecutor("ice-submit-", 4, 10, 200, 60,
new ThreadPoolExecutor.CallerRunsPolicy());
}
/** 对标分析任务专用线程池 */ /** 对标分析任务专用线程池 */
@Bean("benchmarkAsyncExecutor") @Bean("benchmarkAsyncExecutor")
public Executor benchmarkAsyncExecutor() { public Executor benchmarkAsyncExecutor() {

View File

@@ -8,7 +8,11 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* 混剪任务状态同步定时任务 * 混剪任务定时调度
* <p>
* 双路径架构:
* 1. createMixTask 即时异步提交(低延迟,最佳尝试)
* 2. 本定时任务兜底恢复高可靠DB 驱动JVM 重启不丢任务)
* *
* @author 芋道源码 * @author 芋道源码
*/ */
@@ -20,7 +24,23 @@ public class MixTaskStatusSyncJob {
private final MixTaskService mixTaskService; private final MixTaskService mixTaskService;
/** /**
* 每30秒检查一次任务状态 * 每30秒:恢复僵尸 pending 任务job_ids 为空的待提交任务)
* <p>
* 这是 DB 驱动的兜底恢复路径。createMixTask 中的 CompletableFuture
* 可能因 JVM 重启、线程池拒绝等原因丢失任务,此调度保证任务不丢。
*/
@Scheduled(fixedDelay = 30_000, initialDelay = 15_000)
public void recoverPendingSubmissions() {
log.debug("开始恢复僵尸 pending 任务");
try {
mixTaskService.processPendingSubmissions();
} catch (Exception e) {
log.error("恢复僵尸 pending 任务失败", e);
}
}
/**
* 每分钟:同步运行中任务的状态(从 ICE 查询 jobId 进度)
*/ */
@Scheduled(cron = MixTaskConstants.CRON_CHECK_STATUS) @Scheduled(cron = MixTaskConstants.CRON_CHECK_STATUS)
public void syncTaskStatus() { public void syncTaskStatus() {

View File

@@ -72,6 +72,14 @@ public interface MixTaskService {
*/ */
void saveTaskResult(Long taskId, List<String> outputUrls); void saveTaskResult(Long taskId, List<String> outputUrls);
/**
* 处理待提交任务(定时兜底恢复)
*
* 扫描 pending 状态且 job_ids 为空的僵尸任务,重新提交到 ICE。
* 只处理创建超过2分钟的任务避免与即时异步提交冲突。
*/
void processPendingSubmissions();
/** /**
* 生成签名URL * 生成签名URL
* *

View File

@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -47,6 +48,9 @@ public class MixTaskServiceImpl implements MixTaskService {
@Resource @Resource
private FileApi fileApi; private FileApi fileApi;
@Resource(name = "iceSubmitExecutor")
private Executor iceSubmitExecutor;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public Long createMixTask(MixTaskSaveReqVO createReqVO, Long userId) { public Long createMixTask(MixTaskSaveReqVO createReqVO, Long userId) {
@@ -68,7 +72,7 @@ public class MixTaskServiceImpl implements MixTaskService {
// 2. 保存到数据库 // 2. 保存到数据库
mixTaskMapper.insert(task); mixTaskMapper.insert(task);
// 3. 异步提交到阿里云 ICE使用 TTL 自动传递上下文) // 3. 异步提交到阿里云 ICE使用专用线程池 + TTL 自动传递上下文)
CompletableFuture.runAsync(TtlRunnable.get(() -> { CompletableFuture.runAsync(TtlRunnable.get(() -> {
try { try {
submitToICE(task.getId(), createReqVO, userId); submitToICE(task.getId(), createReqVO, userId);
@@ -76,7 +80,7 @@ public class MixTaskServiceImpl implements MixTaskService {
log.error("[MixTask][提交ICE失败] taskId={}", task.getId(), e); log.error("[MixTask][提交ICE失败] taskId={}", task.getId(), e);
updateTaskError(task.getId(), "提交任务失败: " + e.getMessage()); updateTaskError(task.getId(), "提交任务失败: " + e.getMessage());
} }
})); }), iceSubmitExecutor);
return task.getId(); return task.getId();
} }
@@ -178,25 +182,10 @@ public class MixTaskServiceImpl implements MixTaskService {
updateTask.setOutputUrlList(null); updateTask.setOutputUrlList(null);
mixTaskMapper.updateById(updateTask); mixTaskMapper.updateById(updateTask);
// 3. 重新提交到ICE使用 TTL 自动传递上下文) // 3. 重新提交到ICE使用专用线程池 + TTL 自动传递上下文)
CompletableFuture.runAsync(TtlRunnable.get(() -> { CompletableFuture.runAsync(TtlRunnable.get(() -> {
try { try {
// 从 materialsJson 重建请求对象 List<MixTaskSaveReqVO.MaterialItem> materials = rebuildMaterialsFromTask(existTask);
List<MixTaskSaveReqVO.MaterialItem> materials = null;
if (StrUtil.isNotEmpty(existTask.getMaterialsJson())) {
materials = JsonUtils.parseArray(existTask.getMaterialsJson(), MixTaskSaveReqVO.MaterialItem.class);
} else if (existTask.getVideoUrlList() != null && !existTask.getVideoUrlList().isEmpty()) {
// 兼容旧版本:从 videoUrls 重建默认3秒时长
materials = existTask.getVideoUrlList().stream()
.map(url -> {
MixTaskSaveReqVO.MaterialItem item = new MixTaskSaveReqVO.MaterialItem();
item.setFileUrl(url);
item.setDuration(3); // 默认3秒
return item;
})
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
}
if (materials == null || materials.isEmpty()) { if (materials == null || materials.isEmpty()) {
throw new IllegalArgumentException("无法重建素材列表"); throw new IllegalArgumentException("无法重建素材列表");
} }
@@ -210,7 +199,7 @@ public class MixTaskServiceImpl implements MixTaskService {
log.error("[MixTask][重新提交失败] taskId={}", id, e); log.error("[MixTask][重新提交失败] taskId={}", id, e);
updateTaskError(id, "重新提交失败: " + e.getMessage()); updateTaskError(id, "重新提交失败: " + e.getMessage());
} }
})); }), iceSubmitExecutor);
} }
@Override @Override
@@ -282,6 +271,84 @@ public class MixTaskServiceImpl implements MixTaskService {
} }
} }
/**
* 处理待提交的僵尸任务(定时兜底恢复)
*
* 设计意图:
* - createMixTask 中的 CompletableFuture 是"即时路径",追求低延迟
* - 本方法是"兜底路径"处理即时路径丢失的任务JVM 重启、线程池拒绝等)
* - 只处理创建超过2分钟的任务避免与即时路径的异步提交冲突
*
* 执行策略:
* - 扫描最近6小时内、pending 状态、job_ids 为空的僵尸任务
* - 每次至多处理 20 个,避免单次执行时间过长
* - 使用租户上下文重建请求并同步提交到 ICE
* - 单个任务失败不影响其他任务
*/
@Override
public void processPendingSubmissions() {
// 安全窗口只处理创建超过2分钟的任务避免与即时异步提交冲突
LocalDateTime cutoffTime = LocalDateTime.now().minusMinutes(2);
LocalDateTime startTime = LocalDateTime.now().minusHours(MixTaskConstants.CHECK_HOURS_LIMIT);
int successCount = 0;
int failCount = 0;
// 查询僵尸任务pending 且 job_ids 为空,在时间窗口内)
List<MixTaskDO> zombieTasks = TenantUtils.executeIgnore(() ->
mixTaskMapper.selectList(
new cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX<MixTaskDO>()
.eq(MixTaskDO::getStatus, MixTaskConstants.STATUS_PENDING)
.isNull(MixTaskDO::getJobIds)
.ge(MixTaskDO::getCreateTime, startTime)
.le(MixTaskDO::getCreateTime, cutoffTime)
.orderByAsc(MixTaskDO::getCreateTime)
.last("LIMIT 20")
)
);
if (zombieTasks.isEmpty()) {
return;
}
log.info("[MixTask][僵尸任务恢复] 发现 {} 个待提交的僵尸任务", zombieTasks.size());
for (MixTaskDO task : zombieTasks) {
try {
Long taskId = task.getId();
Long userId = task.getUserId();
Long tenantId = task.getTenantId();
List<MixTaskSaveReqVO.MaterialItem> materials = rebuildMaterialsFromTask(task);
if (materials == null || materials.isEmpty()) {
log.warn("[MixTask][僵尸任务跳过] taskId={}, 无法重建素材列表", taskId);
updateTaskError(taskId, "素材数据缺失,无法恢复提交");
failCount++;
continue;
}
MixTaskSaveReqVO saveReqVO = new MixTaskSaveReqVO();
saveReqVO.setTitle(task.getTitle());
saveReqVO.setMaterials(materials);
saveReqVO.setProduceCount(task.getProduceCount());
// 使用任务原有的租户上下文提交
TenantUtils.execute(tenantId, () -> submitToICE(taskId, saveReqVO, userId));
successCount++;
log.info("[MixTask][僵尸任务恢复成功] taskId={}, tenantId={}, materialCount={}",
taskId, tenantId, materials.size());
} catch (Exception e) {
log.error("[MixTask][僵尸任务恢复失败] taskId={}", task.getId(), e);
updateTaskError(task.getId(), "恢复提交失败: " + e.getMessage());
failCount++;
}
}
log.info("[MixTask][僵尸任务恢复完成] 共处理 {} 个, 成功 {} 个, 失败 {} 个",
successCount + failCount, successCount, failCount);
}
/** /**
* 同步任务状态检查所有jobId综合判断 * 同步任务状态检查所有jobId综合判断
* *
@@ -560,16 +627,25 @@ public class MixTaskServiceImpl implements MixTaskService {
} }
/** /**
* 更新任务错误信息 * 更新任务错误信息(自带异常保护,永不对外抛异常)
*
* 这是错误处理链的最后一道防线。即使 DB 更新失败,也至少记录日志,
* 避免因 updateTaskError 自身失败导致 error_msg 永久为 null。
*/ */
private void updateTaskError(Long taskId, String errorMsg) { private void updateTaskError(Long taskId, String errorMsg) {
MixTaskDO updateTask = new MixTaskDO(); try {
updateTask.setId(taskId); MixTaskDO updateTask = new MixTaskDO();
updateTask.setStatus(MixTaskConstants.STATUS_FAILED); updateTask.setId(taskId);
updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED); updateTask.setStatus(MixTaskConstants.STATUS_FAILED);
updateTask.setErrorMsg(errorMsg); updateTask.setProgress(MixTaskConstants.PROGRESS_COMPLETED);
updateTask.setFinishTime(java.time.LocalDateTime.now()); updateTask.setErrorMsg(errorMsg);
mixTaskMapper.updateById(updateTask); updateTask.setFinishTime(LocalDateTime.now());
mixTaskMapper.updateById(updateTask);
log.info("[MixTask][错误已记录] taskId={}, errorMsg={}", taskId, errorMsg);
} catch (Exception e) {
// 最后兜底:至少确保日志中有错误信息,方便运维排查
log.error("[MixTask][严重] 无法更新任务错误信息到数据库taskId={}, errorMsg={}", taskId, errorMsg, e);
}
} }
/** /**
@@ -634,6 +710,31 @@ public class MixTaskServiceImpl implements MixTaskService {
}); });
} }
/**
* 从任务记录重建素材列表(供 retryTask 和 processPendingSubmissions 共用)
* <p>
* 优先从 materialsJson 解析,降级到 videoUrls兼容旧版本默认3秒时长
*
* @param task 任务记录
* @return 素材列表,无法重建时返回 null
*/
private List<MixTaskSaveReqVO.MaterialItem> rebuildMaterialsFromTask(MixTaskDO task) {
if (StrUtil.isNotEmpty(task.getMaterialsJson())) {
return JsonUtils.parseArray(task.getMaterialsJson(), MixTaskSaveReqVO.MaterialItem.class);
}
if (task.getVideoUrlList() != null && !task.getVideoUrlList().isEmpty()) {
return task.getVideoUrlList().stream()
.map(url -> {
MixTaskSaveReqVO.MaterialItem item = new MixTaskSaveReqVO.MaterialItem();
item.setFileUrl(url);
item.setDuration(3);
return item;
})
.collect(Collectors.toList());
}
return null;
}
/** /**
* 校验混剪任务时长 * 校验混剪任务时长
*/ */