feat: 功能优化
This commit is contained in:
@@ -425,12 +425,6 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
|
||||
// 设置当前步骤描述
|
||||
respVO.setCurrentStepDesc(DigitalHumanTaskStepEnum.getDesc(task.getCurrentStep()));
|
||||
|
||||
// 对 resultVideoUrl 进行预签名处理
|
||||
if (StrUtil.isNotBlank(task.getResultVideoUrl())) {
|
||||
String presignedUrl = fileApi.presignGetUrl(task.getResultVideoUrl(), PRESIGN_URL_EXPIRATION_SECONDS);
|
||||
respVO.setResultVideoUrl(presignedUrl);
|
||||
}
|
||||
|
||||
return respVO;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,8 @@
|
||||
package cn.iocoder.yudao.module.tik.voice.service;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.http.HttpRequest;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.number.NumberUtils;
|
||||
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.file.FileDO;
|
||||
import cn.iocoder.yudao.module.infra.dal.mysql.file.FileMapper;
|
||||
import cn.iocoder.yudao.module.infra.framework.file.core.client.FileClient;
|
||||
import cn.iocoder.yudao.module.infra.service.file.FileConfigService;
|
||||
import cn.iocoder.yudao.module.tik.file.dal.dataobject.TikUserFileDO;
|
||||
import cn.iocoder.yudao.module.tik.file.dal.mysql.TikUserFileMapper;
|
||||
import cn.iocoder.yudao.module.tik.file.service.TikOssInitService;
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import cn.iocoder.yudao.module.tik.voice.dal.dataobject.TikDigitalHumanTaskDO;
|
||||
import cn.iocoder.yudao.module.tik.voice.dal.mysql.TikDigitalHumanTaskMapper;
|
||||
import cn.iocoder.yudao.module.tik.voice.vo.AppTikLatentsyncResultRespVO;
|
||||
@@ -30,7 +20,7 @@ import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Latentsync任务轮询服务 - 轻量化异步处理
|
||||
* 数字人任务轮询服务
|
||||
* 使用@TenantIgnore忽略租户检查,因为轮询服务没有用户上下文
|
||||
*
|
||||
* @author 芋道源码
|
||||
@@ -44,31 +34,22 @@ public class LatentsyncPollingService {
|
||||
private final TikDigitalHumanTaskMapper taskMapper;
|
||||
private final LatentsyncService latentsyncService;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final TikOssInitService ossInitService;
|
||||
private final cn.iocoder.yudao.module.infra.api.file.FileApi fileApi;
|
||||
private final TikUserFileMapper userFileMapper;
|
||||
private final FileMapper fileMapper;
|
||||
private final FileConfigService fileConfigService;
|
||||
private final KlingService klingService;
|
||||
|
||||
/**
|
||||
* Redis键前缀
|
||||
*/
|
||||
// ========== 常量 ==========
|
||||
private static final String REDIS_POLLING_PREFIX = "latentsync:polling:";
|
||||
private static final String REDIS_POLLING_TASKS_SET = "latentsync:polling:tasks";
|
||||
private static final String REDIS_POLLING_COUNT_PREFIX = "latentsync:polling:count:";
|
||||
private static final String REDIS_RESULT_PREFIX = "digital_human:task:result:";
|
||||
|
||||
/**
|
||||
* 轮询配置
|
||||
*/
|
||||
private static final int MAX_POLLING_COUNT = 90; // 最多轮询90次(15分钟)
|
||||
private static final int POLLING_INTERVAL_SECONDS = 10; // 轮询间隔10秒
|
||||
private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(1); // 缓存1小时
|
||||
private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(1);
|
||||
private static final Duration RESULT_CACHE_TIME = Duration.ofHours(24);
|
||||
|
||||
// ========== 公开方法 ==========
|
||||
|
||||
/**
|
||||
* 定时轮询Latentsync任务状态 - 每10秒执行一次
|
||||
* 移除了分布式锁,通过查询条件和限制避免并发问题
|
||||
* 注意:此方法现在由 DigitalHumanTaskStatusSyncJob 定时调用,不在服务内部使用 @Scheduled 注解
|
||||
* 定时轮询任务状态入口
|
||||
*/
|
||||
public void pollLatentsyncTasks() {
|
||||
try {
|
||||
@@ -78,53 +59,15 @@ public class LatentsyncPollingService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行轮询任务的具体逻辑
|
||||
*/
|
||||
private void executePollingTasks() {
|
||||
log.info("[executePollingTasks][开始执行轮询任务]");
|
||||
try {
|
||||
// 轮询Latentsync任务
|
||||
List<String> taskIds = getPendingPollingTasks();
|
||||
log.info("[executePollingTasks][获取到Latentsync任务数量={}]", taskIds.size());
|
||||
|
||||
if (!taskIds.isEmpty()) {
|
||||
log.info("[pollLatentsyncTasks][开始轮询Latentsync任务][任务数量={}]", taskIds.size());
|
||||
|
||||
// 逐个处理Latentsync任务
|
||||
for (String taskIdStr : taskIds) {
|
||||
try {
|
||||
Long taskId = Long.parseLong(taskIdStr);
|
||||
pollSingleTask(taskId);
|
||||
} catch (Exception e) {
|
||||
log.error("[pollLatentsyncTasks][轮询Latentsync任务失败][taskId={}]", taskIdStr, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 轮询可灵任务
|
||||
log.info("[executePollingTasks][准备轮询可灵任务]");
|
||||
pollKlingTasks();
|
||||
log.info("[executePollingTasks][可灵任务轮询完成]");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollLatentsyncTasks][轮询任务异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加任务到轮询队列
|
||||
*/
|
||||
public void addTaskToPollingQueue(Long taskId, String requestId) {
|
||||
try {
|
||||
// 存储任务信息
|
||||
String taskKey = REDIS_POLLING_PREFIX + requestId;
|
||||
stringRedisTemplate.opsForValue().set(taskKey, taskId.toString(), CACHE_EXPIRE_TIME);
|
||||
|
||||
// 添加到待轮询集合
|
||||
stringRedisTemplate.opsForZSet().add(REDIS_POLLING_TASKS_SET, requestId, System.currentTimeMillis());
|
||||
|
||||
// 初始化轮询次数
|
||||
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
|
||||
stringRedisTemplate.opsForValue().set(countKey, "0", CACHE_EXPIRE_TIME);
|
||||
|
||||
@@ -135,184 +78,210 @@ public class LatentsyncPollingService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 单个任务轮询
|
||||
* 清理过期任务(由定时任务调用)
|
||||
*/
|
||||
public void cleanupExpiredTasks() {
|
||||
try {
|
||||
stringRedisTemplate.delete(REDIS_POLLING_TASKS_SET);
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupExpiredTasks][清理过期任务异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
/**
|
||||
* 执行轮询任务的具体逻辑
|
||||
*/
|
||||
private void executePollingTasks() {
|
||||
// 轮询 Latentsync 任务
|
||||
List<String> taskIds = getPendingPollingTasks();
|
||||
for (String taskIdStr : taskIds) {
|
||||
try {
|
||||
pollSingleTask(Long.parseLong(taskIdStr));
|
||||
} catch (Exception e) {
|
||||
log.error("[executePollingTasks][轮询任务失败][taskId={}]", taskIdStr, e);
|
||||
}
|
||||
}
|
||||
|
||||
// 轮询 Kling 任务
|
||||
pollKlingTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询单个 Latentsync 任务
|
||||
*/
|
||||
private void pollSingleTask(Long taskId) {
|
||||
// 获取任务的requestId和轮询次数(在try块外声明,供catch块使用)
|
||||
String requestId = null;
|
||||
String countKey = null;
|
||||
int currentCount = 0;
|
||||
|
||||
try {
|
||||
// 获取任务的requestId
|
||||
String taskKey = REDIS_POLLING_PREFIX + "task_" + taskId;
|
||||
requestId = stringRedisTemplate.opsForValue().get(taskKey);
|
||||
|
||||
if (StrUtil.isBlank(requestId)) {
|
||||
// 如果没有requestId,说明任务可能已完成或已取消,从轮询队列中移除
|
||||
removeFromPollingQueue(taskId, null);
|
||||
removeFromPollingQueue(requestId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查轮询次数
|
||||
countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
|
||||
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
|
||||
String countStr = stringRedisTemplate.opsForValue().get(countKey);
|
||||
currentCount = countStr != null ? Integer.parseInt(countStr) : 0;
|
||||
|
||||
if (currentCount >= MAX_POLLING_COUNT) {
|
||||
// 超时,标记任务失败
|
||||
log.warn("[pollSingleTask][任务轮询超时][taskId={}, requestId={}, count={}]", taskId, requestId, currentCount);
|
||||
markTaskFailed(taskId, "Latentsync处理超时");
|
||||
removeFromPollingQueue(taskId, requestId);
|
||||
markTaskFailed(taskId, "处理超时");
|
||||
removeFromPollingQueue(requestId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 调用Latentsync API检查状态
|
||||
// 查询任务状态
|
||||
AppTikLatentsyncResultRespVO result = latentsyncService.getTaskResult(requestId);
|
||||
String status = result.getStatus();
|
||||
|
||||
log.debug("[pollSingleTask][轮询任务状态][taskId={}, requestId={}, status={}]", taskId, requestId, status);
|
||||
|
||||
// 检查任务状态
|
||||
if ("COMPLETED".equals(status)) {
|
||||
// 任务完成
|
||||
handleTaskCompleted(taskId, result.getVideo().getUrl(), requestId);
|
||||
completeTask(taskId, result.getVideo().getUrl(), requestId);
|
||||
} else if ("FAILED".equals(status) || "ERROR".equals(status)) {
|
||||
// 任务失败
|
||||
handleTaskFailed(taskId, "Latentsync处理失败: " + status, requestId);
|
||||
markTaskFailed(taskId, "处理失败: " + status);
|
||||
removeFromPollingQueue(requestId);
|
||||
} else {
|
||||
// 继续轮询,更新轮询次数
|
||||
stringRedisTemplate.opsForValue().set(countKey, String.valueOf(currentCount + 1), CACHE_EXPIRE_TIME);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollSingleTask][轮询任务异常][taskId={}]", taskId, e);
|
||||
|
||||
// 轮询异常处理:增加轮询次数,避免无限重试
|
||||
int errorCount = currentCount + 1;
|
||||
if (errorCount >= MAX_POLLING_COUNT) {
|
||||
// 达到最大次数,标记任务失败
|
||||
log.warn("[pollSingleTask][任务轮询异常次数过多,标记失败][taskId={}, count={}]", taskId, errorCount);
|
||||
if (requestId != null && StrUtil.isNotBlank(requestId)) {
|
||||
markTaskFailed(taskId, "Latentsync API调用异常:" + e.getMessage());
|
||||
removeFromPollingQueue(taskId, requestId);
|
||||
}
|
||||
markTaskFailed(taskId, "API调用异常");
|
||||
removeFromPollingQueue(requestId);
|
||||
} else {
|
||||
// 更新轮询次数,继续重试
|
||||
if (countKey != null) {
|
||||
stringRedisTemplate.opsForValue().set(countKey, String.valueOf(errorCount), CACHE_EXPIRE_TIME);
|
||||
log.debug("[pollSingleTask][轮询异常,增加次数后继续重试][taskId={}, count={}]", taskId, errorCount);
|
||||
}
|
||||
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
|
||||
stringRedisTemplate.opsForValue().set(countKey, String.valueOf(errorCount), CACHE_EXPIRE_TIME);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务完成
|
||||
* 轮询 Kling 任务
|
||||
*/
|
||||
private void pollKlingTasks() {
|
||||
try {
|
||||
List<TikDigitalHumanTaskDO> klingTasks = TenantUtils.executeIgnore(
|
||||
() -> taskMapper.selectPendingKlingTasks()
|
||||
);
|
||||
|
||||
for (TikDigitalHumanTaskDO task : klingTasks) {
|
||||
try {
|
||||
pollKlingSingleTask(task);
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingTasks][轮询任务失败][taskId={}]", task.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingTasks][轮询Kling任务异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询单个 Kling 任务
|
||||
*/
|
||||
private void pollKlingSingleTask(TikDigitalHumanTaskDO task) {
|
||||
String klingTaskId = task.getKlingTaskId();
|
||||
if (StrUtil.isBlank(klingTaskId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
KlingLipSyncQueryResponse response = klingService.getLipSyncTask(klingTaskId);
|
||||
String taskStatus = response.getData().getTaskStatus();
|
||||
|
||||
if ("succeed".equalsIgnoreCase(taskStatus)) {
|
||||
List<KlingLipSyncVideoVO> videos = response.getData().getTaskResult().getVideos();
|
||||
if (videos != null && !videos.isEmpty()) {
|
||||
completeTask(task.getId(), videos.get(0).getUrl(), null);
|
||||
}
|
||||
} else if ("failed".equalsIgnoreCase(taskStatus)) {
|
||||
String errorMsg = "可灵任务执行失败: " + response.getData().getTaskStatusMsg();
|
||||
updateTaskStatus(task.getId(), "FAILED", task.getCurrentStep(), task.getProgress(), errorMsg);
|
||||
} else if ("submitted".equalsIgnoreCase(taskStatus) || "processing".equalsIgnoreCase(taskStatus)) {
|
||||
updateTaskStatus(task.getId(), "PROCESSING", "sync_lip", 70, null);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingSingleTask][任务({})查询失败]", task.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 完成任务(通用方法)
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void handleTaskCompleted(Long taskId, String videoUrl, String requestId) {
|
||||
TikDigitalHumanTaskDO task = null;
|
||||
private void completeTask(Long taskId, String videoUrl, String requestId) {
|
||||
try {
|
||||
// 获取任务信息
|
||||
task = taskMapper.selectById(taskId);
|
||||
if (task == null) {
|
||||
log.error("[handleTaskCompleted][任务不存在][taskId={}]", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 保存视频到OSS(异步处理,轻量化逻辑)
|
||||
OssSaveResult saveResult = null;
|
||||
try {
|
||||
// 保存视频到OSS,避免临时URL过期
|
||||
saveResult = saveVideoToOss(task, videoUrl);
|
||||
log.info("[handleTaskCompleted][任务({})视频已保存到OSS][url={}]", taskId, saveResult.getUrl());
|
||||
} catch (Exception e) {
|
||||
log.warn("[handleTaskCompleted][任务({})保存视频失败,使用原URL][error={}]", taskId, e.getMessage());
|
||||
saveResult = new OssSaveResult(videoUrl, 0, null, null); // 降级处理
|
||||
}
|
||||
|
||||
// 更新任务状态为成功
|
||||
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
|
||||
updateObj.setId(taskId);
|
||||
updateObj.setStatus("SUCCESS");
|
||||
updateObj.setCurrentStep("finishing");
|
||||
updateObj.setProgress(100);
|
||||
updateObj.setResultVideoUrl(saveResult.getUrl());
|
||||
updateObj.setResultVideoUrl(videoUrl);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
taskMapper.updateById(updateObj);
|
||||
|
||||
// 缓存结果到Redis(快速回显)
|
||||
try {
|
||||
String resultKey = "digital_human:task:result:" + taskId;
|
||||
stringRedisTemplate.opsForValue().set(resultKey, saveResult.getUrl(), Duration.ofHours(24));
|
||||
} catch (Exception e) {
|
||||
log.warn("[handleTaskCompleted][任务({})缓存结果失败]", taskId, e);
|
||||
TenantUtils.executeIgnore(() -> taskMapper.updateById(updateObj));
|
||||
|
||||
// 缓存结果
|
||||
String resultKey = REDIS_RESULT_PREFIX + taskId;
|
||||
stringRedisTemplate.opsForValue().set(resultKey, videoUrl, RESULT_CACHE_TIME);
|
||||
|
||||
if (requestId != null) {
|
||||
removeFromPollingQueue(requestId);
|
||||
}
|
||||
|
||||
// 保存结果视频到用户文件表(这样用户可以在素材库中查看)
|
||||
saveResultVideoToUserFiles(task, saveResult);
|
||||
|
||||
// 从轮询队列中移除
|
||||
removeFromPollingQueue(taskId, requestId);
|
||||
|
||||
log.info("[handleTaskCompleted][任务完成][taskId={}, requestId={}]", taskId, requestId);
|
||||
log.info("[completeTask][任务完成][taskId={}, videoUrl={}]", taskId, videoUrl);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleTaskCompleted][处理任务完成失败][taskId={}]", taskId, e);
|
||||
log.error("[completeTask][处理任务完成失败][taskId={}]", taskId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务失败
|
||||
* 标记任务失败
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void handleTaskFailed(Long taskId, String errorMessage, String requestId) {
|
||||
private void markTaskFailed(Long taskId, String errorMessage) {
|
||||
try {
|
||||
// 更新任务状态为失败
|
||||
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
|
||||
updateObj.setId(taskId);
|
||||
updateObj.setStatus("FAILED");
|
||||
updateObj.setErrorMessage(errorMessage);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
taskMapper.updateById(updateObj);
|
||||
|
||||
// 从轮询队列中移除
|
||||
removeFromPollingQueue(taskId, requestId);
|
||||
TenantUtils.executeIgnore(() -> taskMapper.updateById(updateObj));
|
||||
|
||||
log.warn("[handleTaskFailed][任务失败][taskId={}, requestId={}, error={}]", taskId, requestId, errorMessage);
|
||||
log.warn("[markTaskFailed][任务失败][taskId={}, error={}]", taskId, errorMessage);
|
||||
} catch (Exception e) {
|
||||
log.error("[handleTaskFailed][处理任务失败失败][taskId={}]", taskId, e);
|
||||
log.error("[markTaskFailed][标记任务失败失败][taskId={}]", taskId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记任务失败(内部使用)
|
||||
* 更新任务状态
|
||||
*/
|
||||
private void markTaskFailed(Long taskId, String errorMessage) {
|
||||
handleTaskFailed(taskId, errorMessage, null);
|
||||
}
|
||||
private void updateTaskStatus(Long taskId, String status, String currentStep, Integer progress, String errorMessage) {
|
||||
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
|
||||
updateObj.setId(taskId);
|
||||
updateObj.setStatus(status);
|
||||
updateObj.setCurrentStep(currentStep);
|
||||
updateObj.setProgress(progress);
|
||||
|
||||
/**
|
||||
* 从轮询队列中移除任务
|
||||
*/
|
||||
private void removeFromPollingQueue(Long taskId, String requestId) {
|
||||
try {
|
||||
if (StrUtil.isNotBlank(requestId)) {
|
||||
// 移除具体任务
|
||||
String taskKey = REDIS_POLLING_PREFIX + requestId;
|
||||
stringRedisTemplate.delete(taskKey);
|
||||
|
||||
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
|
||||
stringRedisTemplate.delete(countKey);
|
||||
|
||||
stringRedisTemplate.opsForZSet().remove(REDIS_POLLING_TASKS_SET, requestId);
|
||||
} else {
|
||||
// 尝试通过taskId找到requestId并移除
|
||||
String pattern = REDIS_POLLING_PREFIX + "task_" + taskId;
|
||||
// 这里可以优化为直接查询,但为了简单起见,先不实现
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[removeFromPollingQueue][移除任务失败][taskId={}, requestId={}]", taskId, requestId, e);
|
||||
if ("SUCCESS".equals(status)) {
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
} else if ("PROCESSING".equals(status)) {
|
||||
updateObj.setStartTime(LocalDateTime.now());
|
||||
} else if ("FAILED".equals(status)) {
|
||||
updateObj.setErrorMessage(errorMessage);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
}
|
||||
|
||||
TenantUtils.executeIgnore(() -> taskMapper.updateById(updateObj));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -320,8 +289,6 @@ public class LatentsyncPollingService {
|
||||
*/
|
||||
private List<String> getPendingPollingTasks() {
|
||||
try {
|
||||
// 获取所有待轮询的任务
|
||||
// 注意:这里返回的是requestId,不是taskId
|
||||
List<String> requestIds = stringRedisTemplate.opsForZSet()
|
||||
.range(REDIS_POLLING_TASKS_SET, 0, -1)
|
||||
.stream()
|
||||
@@ -331,12 +298,8 @@ public class LatentsyncPollingService {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
// 将requestId转换为taskId
|
||||
return requestIds.stream()
|
||||
.map(requestId -> {
|
||||
String taskKey = REDIS_POLLING_PREFIX + requestId;
|
||||
return stringRedisTemplate.opsForValue().get(taskKey);
|
||||
})
|
||||
.map(requestId -> stringRedisTemplate.opsForValue().get(REDIS_POLLING_PREFIX + requestId))
|
||||
.filter(StrUtil::isNotBlank)
|
||||
.toList();
|
||||
} catch (Exception e) {
|
||||
@@ -346,269 +309,17 @@ public class LatentsyncPollingService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理过期任务(每天凌晨2点执行)
|
||||
* 注意:此方法现在由外部调度器调用,不在服务内部使用 @Scheduled 注解
|
||||
* 从轮询队列中移除任务
|
||||
*/
|
||||
public void cleanupExpiredTasks() {
|
||||
private void removeFromPollingQueue(String requestId) {
|
||||
try {
|
||||
log.info("[cleanupExpiredTasks][开始清理过期轮询任务]");
|
||||
|
||||
// 清理过期的轮询记录
|
||||
stringRedisTemplate.delete(REDIS_POLLING_TASKS_SET);
|
||||
|
||||
// 可以添加更多清理逻辑
|
||||
|
||||
log.info("[cleanupExpiredTasks][清理完成]");
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupExpiredTasks][清理过期任务异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存远程视频URL
|
||||
* 简化版:直接保存Kling返回的URL,不再下载上传到OSS
|
||||
*/
|
||||
private OssSaveResult saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception {
|
||||
log.info("[saveVideoToOss][任务({})直接保存Kling URL][url={}]", task.getId(), remoteVideoUrl);
|
||||
|
||||
// 直接返回Kling URL,不上传到OSS
|
||||
return new OssSaveResult(remoteVideoUrl, 0, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* OSS保存结果
|
||||
*/
|
||||
private static class OssSaveResult {
|
||||
private final String url;
|
||||
private final int fileSize;
|
||||
private final String filePath;
|
||||
private final Long infraFileId;
|
||||
|
||||
public OssSaveResult(String url, int fileSize, String filePath, Long infraFileId) {
|
||||
this.url = url;
|
||||
this.fileSize = fileSize;
|
||||
this.filePath = filePath;
|
||||
this.infraFileId = infraFileId;
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public int getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public String getFilePath() {
|
||||
return filePath;
|
||||
}
|
||||
|
||||
public Long getInfraFileId() {
|
||||
return infraFileId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存结果视频到用户文件表
|
||||
* 如果 OSS 保存失败(infraFileId 为 null),直接保存外部 URL
|
||||
*/
|
||||
private void saveResultVideoToUserFiles(TikDigitalHumanTaskDO task, OssSaveResult saveResult) {
|
||||
try {
|
||||
Long userId = task.getUserId();
|
||||
|
||||
if (userId == null) {
|
||||
log.warn("[saveResultVideoToUserFiles][任务({})userId为空,无法保存]", task.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 创建用户文件记录(支持外部 URL,fileId 可为空)
|
||||
TikUserFileDO userFile = new TikUserFileDO();
|
||||
userFile.setUserId(userId);
|
||||
userFile.setFileId(saveResult.getInfraFileId()); // OSS保存失败时为null,表示外部URL
|
||||
userFile.setFileName(String.format("数字人视频_%d.mp4", task.getId()));
|
||||
userFile.setFileType("video/mp4");
|
||||
userFile.setFileCategory("generate");
|
||||
userFile.setFileUrl(saveResult.getUrl());
|
||||
userFile.setFilePath(saveResult.getFilePath());
|
||||
userFile.setFileSize(saveResult.getInfraFileId() != null ? (long) saveResult.getFileSize() : null);
|
||||
|
||||
userFileMapper.insert(userFile);
|
||||
|
||||
if (saveResult.getInfraFileId() != null) {
|
||||
log.info("[saveResultVideoToUserFiles][任务({})已保存到OSS][userFileId={}, infraFileId={}]",
|
||||
task.getId(), userFile.getId(), saveResult.getInfraFileId());
|
||||
} else {
|
||||
log.info("[saveResultVideoToUserFiles][任务({})已保存外部URL][userFileId={}, url={}]",
|
||||
task.getId(), userFile.getId(), saveResult.getUrl());
|
||||
if (StrUtil.isNotBlank(requestId)) {
|
||||
stringRedisTemplate.delete(REDIS_POLLING_PREFIX + requestId);
|
||||
stringRedisTemplate.delete(REDIS_POLLING_COUNT_PREFIX + requestId);
|
||||
stringRedisTemplate.opsForZSet().remove(REDIS_POLLING_TASKS_SET, requestId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[saveResultVideoToUserFiles][任务({})保存失败]", task.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询可灵任务状态
|
||||
*/
|
||||
private void pollKlingTasks() {
|
||||
try {
|
||||
log.info("[pollKlingTasks][开始查询待轮询的可灵任务]");
|
||||
|
||||
// 显式忽略租户限制查询待轮询的可灵任务
|
||||
List<TikDigitalHumanTaskDO> klingTasks = cn.iocoder.yudao.framework.tenant.core.util.TenantUtils.executeIgnore(
|
||||
() -> taskMapper.selectPendingKlingTasks()
|
||||
);
|
||||
|
||||
if (klingTasks.isEmpty()) {
|
||||
log.info("[pollKlingTasks][没有待轮询的可灵任务]");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[pollKlingTasks][开始轮询可灵任务][任务数量={}]", klingTasks.size());
|
||||
|
||||
// 逐个处理可灵任务
|
||||
for (TikDigitalHumanTaskDO task : klingTasks) {
|
||||
try {
|
||||
pollKlingSingleTask(task);
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingTasks][轮询可灵任务失败][taskId={}]", task.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingTasks][轮询可灵任务异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 轮询单个可灵任务
|
||||
*/
|
||||
private void pollKlingSingleTask(TikDigitalHumanTaskDO task) {
|
||||
String klingTaskId = task.getKlingTaskId();
|
||||
if (StrUtil.isBlank(klingTaskId)) {
|
||||
log.warn("[pollKlingSingleTask][任务({})缺少klingTaskId]", task.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 查询可灵任务状态
|
||||
KlingLipSyncQueryResponse response = klingService.getLipSyncTask(klingTaskId);
|
||||
String taskStatus = response.getData().getTaskStatus();
|
||||
String taskStatusMsg = response.getData().getTaskStatusMsg();
|
||||
|
||||
log.info("[pollKlingSingleTask][任务({})状态更新][klingTaskId={}, status={}, msg={}]",
|
||||
task.getId(), klingTaskId, taskStatus, taskStatusMsg);
|
||||
|
||||
// 根据状态更新任务
|
||||
if ("succeed".equalsIgnoreCase(taskStatus)) {
|
||||
handleKlingTaskSucceed(task, response);
|
||||
} else if ("failed".equalsIgnoreCase(taskStatus)) {
|
||||
// 任务失败
|
||||
String errorMsg = "可灵任务执行失败: " + (StrUtil.isNotBlank(taskStatusMsg) ? taskStatusMsg : "未知错误");
|
||||
updateTaskStatus(task.getId(), "FAILED", task.getCurrentStep(), task.getProgress(), errorMsg, null, errorMsg);
|
||||
log.error("[pollKlingSingleTask][任务({})失败][error={}]", task.getId(), errorMsg);
|
||||
|
||||
} else if ("submitted".equalsIgnoreCase(taskStatus) || "processing".equalsIgnoreCase(taskStatus)) {
|
||||
// 任务还在处理中,更新进度
|
||||
updateTaskStatus(task.getId(), "PROCESSING", "sync_lip", 70, "口型同步处理中", null);
|
||||
log.info("[pollKlingSingleTask][任务({})处理中][klingTaskId={}, status={}]", task.getId(), klingTaskId, taskStatus);
|
||||
|
||||
} else {
|
||||
log.warn("[pollKlingSingleTask][任务({})未知状态][status={}]", task.getId(), taskStatus);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[pollKlingSingleTask][任务({})查询失败]", task.getId(), e);
|
||||
// 不更新任务状态,避免误判
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
*/
|
||||
private void updateTaskStatus(Long taskId, String status, String currentStep, Integer progress,
|
||||
String message, String resultVideoUrl) {
|
||||
updateTaskStatus(taskId, status, currentStep, progress, message, resultVideoUrl, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务状态(带错误详情)
|
||||
*/
|
||||
private void updateTaskStatus(Long taskId, String status, String currentStep, Integer progress,
|
||||
String message, String resultVideoUrl, String errorDetail) {
|
||||
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
|
||||
updateObj.setId(taskId);
|
||||
updateObj.setStatus(status);
|
||||
updateObj.setCurrentStep(currentStep);
|
||||
updateObj.setProgress(progress);
|
||||
|
||||
if ("SUCCESS".equals(status)) {
|
||||
updateObj.setResultVideoUrl(resultVideoUrl);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
} else if ("PROCESSING".equals(status)) {
|
||||
updateObj.setStartTime(LocalDateTime.now());
|
||||
} else if ("FAILED".equals(status)) {
|
||||
updateObj.setErrorMessage(message);
|
||||
updateObj.setErrorDetail(errorDetail);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
}
|
||||
|
||||
// 显式忽略租户限制执行更新操作
|
||||
cn.iocoder.yudao.framework.tenant.core.util.TenantUtils.executeIgnore(
|
||||
() -> taskMapper.updateById(updateObj)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理可灵任务成功
|
||||
*/
|
||||
private void handleKlingTaskSucceed(TikDigitalHumanTaskDO task, KlingLipSyncQueryResponse response) {
|
||||
List<KlingLipSyncVideoVO> videos = response.getData().getTaskResult().getVideos();
|
||||
if (videos == null || videos.isEmpty()) {
|
||||
log.warn("[pollKlingSingleTask][任务({})成功但无视频结果]", task.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
String videoUrl = videos.get(0).getUrl();
|
||||
OssSaveResult saveResult;
|
||||
|
||||
try {
|
||||
saveResult = saveVideoToOss(task, videoUrl);
|
||||
log.info("[pollKlingSingleTask][任务({})视频已保存到OSS][url={}]", task.getId(), saveResult.getUrl());
|
||||
} catch (Exception e) {
|
||||
log.warn("[pollKlingSingleTask][任务({})保存视频失败,使用原URL][error={}]", task.getId(), e.getMessage());
|
||||
saveResult = new OssSaveResult(videoUrl, 0, null, null);
|
||||
}
|
||||
|
||||
updateTaskSuccess(task.getId(), saveResult.getUrl());
|
||||
cacheTaskResult(task.getId(), saveResult.getUrl());
|
||||
saveResultVideoToUserFiles(task, saveResult);
|
||||
|
||||
log.info("[pollKlingSingleTask][任务({})完成][videoUrl={}]", task.getId(), saveResult.getUrl());
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务为成功状态
|
||||
*/
|
||||
private void updateTaskSuccess(Long taskId, String videoUrl) {
|
||||
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
|
||||
updateObj.setId(taskId);
|
||||
updateObj.setStatus("SUCCESS");
|
||||
updateObj.setCurrentStep("finishing");
|
||||
updateObj.setProgress(100);
|
||||
updateObj.setResultVideoUrl(videoUrl);
|
||||
updateObj.setFinishTime(LocalDateTime.now());
|
||||
taskMapper.updateById(updateObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* 缓存任务结果到 Redis
|
||||
*/
|
||||
private void cacheTaskResult(Long taskId, String url) {
|
||||
try {
|
||||
String resultKey = "digital_human:task:result:" + taskId;
|
||||
stringRedisTemplate.opsForValue().set(resultKey, url, Duration.ofHours(24));
|
||||
} catch (Exception e) {
|
||||
log.warn("[cacheTaskResult][任务({})缓存结果失败]", taskId, e);
|
||||
log.error("[removeFromPollingQueue][移除任务失败][requestId={}]", requestId, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user