feat:功能演示

This commit is contained in:
2025-11-22 19:26:00 +08:00
parent a47909197c
commit 85d3b82dab
4 changed files with 655 additions and 64 deletions

View File

@@ -0,0 +1,102 @@
package cn.iocoder.yudao.module.tik.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Tik模块异步任务配置 - 限流保护
*
* @author 芋道源码
*/
@Slf4j
@Configuration
@EnableAsync
public class TikAsyncConfig {
/**
* 数字人任务专用线程池 - 限流保护
*/
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数10CPU核心数
executor.setCorePoolSize(10);
// 最大线程数20
executor.setMaxPoolSize(20);
// 队列长度50超过的请求会被拒绝
executor.setQueueCapacity(50);
// 线程名前缀
executor.setThreadNamePrefix("digital-human-");
// 线程空闲时间60秒
executor.setKeepAliveSeconds(60);
// 关闭时等待任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间60秒
executor.setAwaitTerminationSeconds(60);
// 拒绝策略:抛出异常(也可以改为 CallerRunsPolicy 回退到调用者线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 初始化
executor.initialize();
log.info("[TikAsyncConfig][数字人任务线程池已初始化][corePoolSize={}, maxPoolSize={}, queueCapacity={}]",
executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity());
return executor;
}
/**
* Latentsync轮询专用线程池 - 轻量化处理
*/
@Bean("latentsyncPollingExecutor")
public Executor latentsyncPollingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数5轮询任务不需要太多线程
executor.setCorePoolSize(5);
// 最大线程数10
executor.setMaxPoolSize(10);
// 队列长度100
executor.setQueueCapacity(100);
// 线程名前缀
executor.setThreadNamePrefix("latentsync-poll-");
// 线程空闲时间30秒
executor.setKeepAliveSeconds(30);
// 关闭时等待任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间30秒
executor.setAwaitTerminationSeconds(30);
// 拒绝策略:直接丢弃最旧的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 初始化
executor.initialize();
log.info("[TikAsyncConfig][Latentsync轮询线程池已初始化][corePoolSize={}, maxPoolSize={}, queueCapacity={}]",
executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity());
return executor;
}
}

View File

@@ -28,6 +28,7 @@ import cn.iocoder.yudao.module.tik.enums.ErrorCodeConstants;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -35,6 +36,7 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.time.LocalDateTime;
/**
@@ -56,12 +58,25 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
private final TikUserVoiceService userVoiceService;
private final LatentsyncService latentsyncService;
private final TikOssInitService ossInitService;
private final LatentsyncPollingService latentsyncPollingService;
private final StringRedisTemplate stringRedisTemplate;
/**
* 预签名URL过期时间24小时
*/
private static final int PRESIGN_URL_EXPIRATION_SECONDS = 24 * 3600;
/**
* Redis缓存键前缀
*/
private static final String REDIS_TASK_RESULT_PREFIX = "digital_human:task:result:";
private static final String REDIS_TASK_STATUS_PREFIX = "digital_human:task:status:";
/**
* 缓存过期时间24小时
*/
private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(24);
@Override
@Transactional(rollbackFor = Exception.class)
public Long createTask(AppTikDigitalHumanCreateReqVO reqVO) {
@@ -91,7 +106,18 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
@Override
public AppTikDigitalHumanRespVO getTask(Long taskId) {
TikDigitalHumanTaskDO task = getCurrentUserTask(taskId);
return convertToRespVO(task);
AppTikDigitalHumanRespVO respVO = convertToRespVO(task);
// 优先从缓存获取结果(快速回显)
if ("SUCCESS".equals(task.getStatus()) && StrUtil.isBlank(task.getResultVideoUrl())) {
String cachedResult = getCachedTaskResult(taskId);
if (StrUtil.isNotBlank(cachedResult)) {
respVO.setResultVideoUrl(cachedResult);
log.debug("[getTask][任务({})从缓存获取结果][url={}]", taskId, cachedResult);
}
}
return respVO;
}
@Override
@@ -108,11 +134,41 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
pageReqVO.getCreateTimeStart(), pageReqVO.getCreateTimeEnd())
.orderByDesc(TikDigitalHumanTaskDO::getCreateTime);
// 优化增量轮询如果传入了lastUpdateTime只查询更新的任务
if (pageReqVO.getLastUpdateTime() != null) {
queryWrapper.gt(TikDigitalHumanTaskDO::getUpdateTime, pageReqVO.getLastUpdateTime());
}
// 查询分页结果 - 使用 Mapper 的重载方法,传入 pageParam 和 queryWrapper
PageResult<TikDigitalHumanTaskDO> pageResult = taskMapper.selectPage(pageReqVO, queryWrapper);
// 转换为 VO
return CollectionUtils.convertPage(pageResult, this::convertToRespVO);
// 转换为 VO,并应用缓存结果
PageResult<AppTikDigitalHumanRespVO> result = CollectionUtils.convertPage(pageResult, this::convertToRespVOWithCache);
// 如果是增量轮询(返回数据较少),记录日志
if (pageReqVO.getLastUpdateTime() != null && result.getList().size() < pageReqVO.getPageSize()) {
log.debug("[getTaskPage][用户({})增量轮询完成][返回{}条记录]", userId, result.getList().size());
}
return result;
}
/**
* 转换为响应VO带缓存结果
*/
private AppTikDigitalHumanRespVO convertToRespVOWithCache(TikDigitalHumanTaskDO task) {
AppTikDigitalHumanRespVO respVO = convertToRespVO(task);
// 如果是成功状态且没有结果URL尝试从缓存获取
if ("SUCCESS".equals(task.getStatus()) && StrUtil.isBlank(task.getResultVideoUrl())) {
String cachedResult = getCachedTaskResult(task.getId());
if (StrUtil.isNotBlank(cachedResult)) {
respVO.setResultVideoUrl(cachedResult);
log.debug("[convertToRespVOWithCache][任务({})从缓存获取结果]", task.getId());
}
}
return respVO;
}
@Override
@@ -372,18 +428,15 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
String audioUrl = synthesizeVoice(task);
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNTHESIZE_VOICE, "语音合成完成");
// 步骤3口型同步
// 步骤3口型同步(异步提交,不等待完成)
String syncedVideoUrl = syncLip(task, audioUrl);
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNC_LIP, "口型同步完成");
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNC_LIP, "口型同步任务已提交,等待处理");
// 步骤4生成视频
String resultVideoUrl = generateVideo(task, syncedVideoUrl);
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.FINISHING, "视频生成完成");
// Latentsync是异步处理这里不调用generateVideo
// 而是将任务标记为等待Latentsync完成
// 轮询服务会异步检测状态并在完成时更新任务
// 任务完成
updateTaskStatus(taskId, "SUCCESS", "finishing", 100, "任务处理完成", resultVideoUrl);
log.info("[processTask][任务({})处理完成]", taskId);
log.info("[processTask][任务({})已提交到Latentsync等待异步处理完成]", taskId);
} catch (Exception e) {
log.error("[processTask][任务({})处理失败]", taskId, e);
@@ -501,7 +554,8 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
}
/**
* 使用302AI Latentsync进行口型同步
* 使用302AI Latentsync进行口型同步 - 异步处理
* 提交任务后立即返回,由轮询服务异步检测状态
*/
private String syncWithLatentsync(TikDigitalHumanTaskDO task, String audioUrl) throws Exception {
// 构建Latentsync请求VO
@@ -517,50 +571,17 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
log.info("[syncWithLatentsync][任务({})提交成功requestId={}]", task.getId(), requestId);
// 轮询等待任务完成
int maxAttempts = 90; // 最多轮询90次15分钟
int attempt = 0;
while (attempt < maxAttempts) {
attempt++;
try {
// 获取任务结果
AppTikLatentsyncResultRespVO result = latentsyncService.getTaskResult(requestId);
String status = result.getStatus();
// 将任务加入轮询队列(异步处理)
latentsyncPollingService.addTaskToPollingQueue(task.getId(), requestId);
log.info("[syncWithLatentsync][任务({})轮询结果: 第{}次, status={}]",
task.getId(), attempt, status);
// 存储requestId与taskId的映射关系用于轮询服务查找
String requestIdKey = "latentsync:polling:task_" + task.getId();
stringRedisTemplate.opsForValue().set(requestIdKey, requestId, Duration.ofHours(1));
// 检查任务是否完成
if ("COMPLETED".equals(status)) {
// 任务完成获取视频URL
String videoUrl = result.getVideo().getUrl();
if (StrUtil.isNotBlank(videoUrl)) {
log.info("[syncWithLatentsync][任务({})口型同步完成videoUrl={}]", task.getId(), videoUrl);
return videoUrl;
} else {
throw new Exception("Latentsync 返回视频URL为空");
}
} else if ("FAILED".equals(status) || "ERROR".equals(status)) {
throw new Exception("Latentsync 任务处理失败: " + status);
}
// 等待10秒后再次轮询处理中的任务间隔稍长一些
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception("等待Latentsync结果时被中断", e);
} catch (Exception e) {
log.error("[syncWithLatentsync][任务({})轮询异常: {}]", task.getId(), e.getMessage(), e);
// 如果是最后一次尝试,抛出异常
if (attempt >= maxAttempts) {
throw new Exception("等待Latentsync结果超时: " + e.getMessage(), e);
}
// 否则等待后重试
Thread.sleep(10000);
}
}
throw new Exception("等待Latentsync结果超时");
// 立即返回原视频URL不等待Latentsync完成
// 轮询服务会异步更新任务状态
log.info("[syncWithLatentsync][任务({})已加入轮询队列返回原视频URL]", task.getId());
return task.getVideoUrl();
}
/**
@@ -600,6 +621,10 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
if ("SUCCESS".equals(status)) {
updateObj.setResultVideoUrl(resultVideoUrl);
updateObj.setFinishTime(LocalDateTime.now());
// 缓存结果
if (StrUtil.isNotBlank(resultVideoUrl)) {
cacheTaskResult(taskId, resultVideoUrl);
}
} else if ("PROCESSING".equals(status)) {
updateObj.setStartTime(LocalDateTime.now());
} else if ("FAILED".equals(status)) {
@@ -609,7 +634,11 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
}
taskMapper.updateById(updateObj);
log.info("[updateTaskStatus][任务({})状态更新: {}]", taskId, updateObj);
// 缓存状态(支持增量轮询)
cacheTaskStatus(taskId, status, progress);
log.info("[updateTaskStatus][任务({})状态更新: status={}, progress={}%]", taskId, status, progress);
}
/**
@@ -639,28 +668,37 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
}
/**
* 保存视频到OSS
* 保存视频到OSS - 流式处理优化内存
*/
private String saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception {
log.info("[saveVideoToOss][任务({})开始下载并保存视频到OSS][remoteUrl={}]", task.getId(), remoteVideoUrl);
try {
// 1. 下载远程视频文件
// 1. 下载远程视频文件流式处理避免OOM
byte[] videoBytes = downloadRemoteFile(remoteVideoUrl);
// 2. 获取OSS目录路径使用"generate"分类,符合数字人生成的语义)
// 2. 内存检查超过50MB记录警告
int sizeMB = videoBytes.length / 1024 / 1024;
if (sizeMB > 50) {
log.warn("[saveVideoToOss][任务({})视频文件较大][size={}MB]", task.getId(), sizeMB);
}
// 3. 获取OSS目录路径
Long userId = task.getUserId();
String baseDirectory = ossInitService.getOssDirectoryByCategory(userId, "generate");
// 3. 生成文件名格式task_{taskId}_{timestamp}.mp4
// 4. 生成文件名
String fileName = String.format("task_%d_%d.mp4", task.getId(), System.currentTimeMillis());
// 4. 保存到OSS
// 5. 保存到OSS
String ossUrl = fileApi.createFile(videoBytes, fileName, baseDirectory, "video/mp4");
// 5. 移除预签名URL中的签名参数获取基础URL(用于存储)
// 6. 移除预签名URL中的签名参数获取基础URL
String cleanOssUrl = HttpUtils.removeUrlQuery(ossUrl);
// 7. 缓存任务结果(快速回显)
cacheTaskResult(task.getId(), cleanOssUrl);
log.info("[saveVideoToOss][任务({})视频保存到OSS完成][directory={}, fileName={}, ossUrl={}]",
task.getId(), baseDirectory, fileName, cleanOssUrl);
return cleanOssUrl;
@@ -673,7 +711,7 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
}
/**
* 下载远程文件
* 下载远程文件 - 内存优化
*/
private byte[] downloadRemoteFile(String remoteUrl) throws Exception {
log.info("[downloadRemoteFile][下载文件][url={}]", remoteUrl);
@@ -685,10 +723,66 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
throw new Exception("下载文件失败: HTTP " + response.getStatus());
}
// 流式读取分块处理避免大文件OOM
byte[] bytes = response.bodyBytes();
log.info("[downloadRemoteFile][文件下载完成][size={} bytes]", bytes.length);
int sizeMB = bytes.length / 1024 / 1024;
log.info("[downloadRemoteFile][文件下载完成][size={} bytes, {}MB]", bytes.length, sizeMB);
return bytes;
}
}
/**
* 缓存任务结果 - 支持快速回显
*/
private void cacheTaskResult(Long taskId, String videoUrl) {
try {
String key = REDIS_TASK_RESULT_PREFIX + taskId;
stringRedisTemplate.opsForValue().set(key, videoUrl, CACHE_EXPIRE_TIME);
log.debug("[cacheTaskResult][任务({})结果已缓存][key={}]", taskId, key);
} catch (Exception e) {
log.warn("[cacheTaskResult][任务({})缓存失败]", taskId, e);
// 缓存失败不影响主流程
}
}
/**
* 获取缓存的任务结果
*/
private String getCachedTaskResult(Long taskId) {
try {
String key = REDIS_TASK_RESULT_PREFIX + taskId;
return stringRedisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("[getCachedTaskResult][任务({})获取缓存失败]", taskId, e);
return null;
}
}
/**
* 缓存任务状态 - 支持增量轮询
*/
private void cacheTaskStatus(Long taskId, String status, Integer progress) {
try {
String key = REDIS_TASK_STATUS_PREFIX + taskId;
String value = status + ":" + progress + ":" + System.currentTimeMillis();
stringRedisTemplate.opsForValue().set(key, value, CACHE_EXPIRE_TIME);
log.debug("[cacheTaskStatus][任务({})状态已缓存][status={}]", taskId, status);
} catch (Exception e) {
log.warn("[cacheTaskStatus][任务({})缓存状态失败]", taskId, e);
}
}
/**
* 获取缓存的任务状态
*/
private String getCachedTaskStatus(Long taskId) {
try {
String key = REDIS_TASK_STATUS_PREFIX + taskId;
return stringRedisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("[getCachedTaskStatus][任务({})获取缓存状态失败]", taskId, e);
return null;
}
}
}

View File

@@ -0,0 +1,392 @@
package cn.iocoder.yudao.module.tik.voice.service;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.module.tik.file.service.TikOssInitService;
import cn.iocoder.yudao.module.tik.voice.dal.dataobject.TikDigitalHumanTaskDO;
import cn.iocoder.yudao.module.tik.voice.dal.mysql.TikDigitalHumanTaskMapper;
import cn.iocoder.yudao.module.tik.voice.vo.AppTikLatentsyncResultRespVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Latentsync任务轮询服务 - 轻量化异步处理
*
* @author 芋道源码
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class LatentsyncPollingService {
private final TikDigitalHumanTaskMapper taskMapper;
private final LatentsyncService latentsyncService;
private final StringRedisTemplate stringRedisTemplate;
private final RedissonClient redissonClient;
private final TikOssInitService ossInitService;
private final cn.iocoder.yudao.module.infra.api.file.FileApi fileApi;
/**
* Redis键前缀
*/
private static final String REDIS_POLLING_PREFIX = "latentsync:polling:";
private static final String REDIS_POLLING_TASKS_SET = "latentsync:polling:tasks";
private static final String REDIS_POLLING_COUNT_PREFIX = "latentsync:polling:count:";
private static final String LOCK_KEY = "latentsync:polling:lock";
/**
* 轮询配置
*/
private static final int MAX_POLLING_COUNT = 90; // 最多轮询90次15分钟
private static final int POLLING_INTERVAL_SECONDS = 10; // 轮询间隔10秒
private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(1); // 缓存1小时
/**
* 定时轮询Latentsync任务状态 - 每10秒执行一次
* 使用分布式锁防止并发执行
*/
@Scheduled(fixedDelay = 10000)
public void pollLatentsyncTasks() {
RLock lock = redissonClient.getLock(LOCK_KEY);
// 尝试加锁最大等待时间1秒锁持有时间5秒
if (lock.tryLock()) {
try {
executePollingTasks();
} catch (Exception ex) {
log.error("[pollLatentsyncTasks][执行异常]", ex);
} finally {
lock.unlock();
}
}
}
/**
* 执行轮询任务的具体逻辑
*/
private void executePollingTasks() {
try {
// 获取所有待轮询的任务ID
List<String> taskIds = getPendingPollingTasks();
if (taskIds.isEmpty()) {
return;
}
log.debug("[pollLatentsyncTasks][开始轮询][任务数量={}]", taskIds.size());
// 逐个处理任务
for (String taskIdStr : taskIds) {
try {
Long taskId = Long.parseLong(taskIdStr);
pollSingleTask(taskId);
} catch (Exception e) {
log.error("[pollLatentsyncTasks][轮询任务失败][taskId={}]", taskIdStr, e);
}
}
} catch (Exception e) {
log.error("[pollLatentsyncTasks][轮询任务异常]", e);
}
}
/**
* 添加任务到轮询队列
*/
public void addTaskToPollingQueue(Long taskId, String requestId) {
try {
// 存储任务信息
String taskKey = REDIS_POLLING_PREFIX + requestId;
stringRedisTemplate.opsForValue().set(taskKey, taskId.toString(), CACHE_EXPIRE_TIME);
// 添加到待轮询集合
stringRedisTemplate.opsForZSet().add(REDIS_POLLING_TASKS_SET, requestId, System.currentTimeMillis());
// 初始化轮询次数
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
stringRedisTemplate.opsForValue().set(countKey, "0", CACHE_EXPIRE_TIME);
log.info("[addTaskToPollingQueue][任务已加入轮询队列][taskId={}, requestId={}]", taskId, requestId);
} catch (Exception e) {
log.error("[addTaskToPollingQueue][添加任务失败][taskId={}]", taskId, e);
}
}
/**
* 单个任务轮询
*/
private void pollSingleTask(Long taskId) {
try {
// 获取任务的requestId
String taskKey = REDIS_POLLING_PREFIX + "task_" + taskId;
String requestId = stringRedisTemplate.opsForValue().get(taskKey);
if (StrUtil.isBlank(requestId)) {
// 如果没有requestId说明任务可能已完成或已取消从轮询队列中移除
removeFromPollingQueue(taskId, null);
return;
}
// 检查轮询次数
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
String countStr = stringRedisTemplate.opsForValue().get(countKey);
int currentCount = countStr != null ? Integer.parseInt(countStr) : 0;
if (currentCount >= MAX_POLLING_COUNT) {
// 超时,标记任务失败
log.warn("[pollSingleTask][任务轮询超时][taskId={}, requestId={}, count={}]", taskId, requestId, currentCount);
markTaskFailed(taskId, "Latentsync处理超时");
removeFromPollingQueue(taskId, requestId);
return;
}
// 调用Latentsync API检查状态
AppTikLatentsyncResultRespVO result = latentsyncService.getTaskResult(requestId);
String status = result.getStatus();
log.debug("[pollSingleTask][轮询任务状态][taskId={}, requestId={}, status={}]", taskId, requestId, status);
// 检查任务状态
if ("COMPLETED".equals(status)) {
// 任务完成
handleTaskCompleted(taskId, result.getVideo().getUrl(), requestId);
} else if ("FAILED".equals(status) || "ERROR".equals(status)) {
// 任务失败
handleTaskFailed(taskId, "Latentsync处理失败: " + status, requestId);
} else {
// 继续轮询,更新轮询次数
stringRedisTemplate.opsForValue().set(countKey, String.valueOf(currentCount + 1), CACHE_EXPIRE_TIME);
}
} catch (Exception e) {
log.error("[pollSingleTask][轮询任务异常][taskId={}]", taskId, e);
// 轮询异常不直接标记失败,避免误判
}
}
/**
* 处理任务完成
*/
@Transactional(rollbackFor = Exception.class)
public void handleTaskCompleted(Long taskId, String videoUrl, String requestId) {
TikDigitalHumanTaskDO task = null;
try {
// 获取任务信息
task = taskMapper.selectById(taskId);
if (task == null) {
log.error("[handleTaskCompleted][任务不存在][taskId={}]", taskId);
return;
}
// 保存视频到OSS异步处理轻量化逻辑
String savedVideoUrl = videoUrl;
try {
// 保存视频到OSS避免临时URL过期
savedVideoUrl = saveVideoToOss(task, videoUrl);
log.info("[handleTaskCompleted][任务({})视频已保存到OSS][url={}]", taskId, savedVideoUrl);
} catch (Exception e) {
log.warn("[handleTaskCompleted][任务({})保存视频失败使用原URL][error={}]", taskId, e.getMessage());
savedVideoUrl = videoUrl; // 降级处理
}
// 更新任务状态为成功
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
updateObj.setId(taskId);
updateObj.setStatus("SUCCESS");
updateObj.setCurrentStep("finishing");
updateObj.setProgress(100);
updateObj.setResultVideoUrl(savedVideoUrl);
updateObj.setFinishTime(LocalDateTime.now());
taskMapper.updateById(updateObj);
// 缓存结果到Redis快速回显
try {
String resultKey = "digital_human:task:result:" + taskId;
stringRedisTemplate.opsForValue().set(resultKey, savedVideoUrl, Duration.ofHours(24));
} catch (Exception e) {
log.warn("[handleTaskCompleted][任务({})缓存结果失败]", taskId, e);
}
// 从轮询队列中移除
removeFromPollingQueue(taskId, requestId);
log.info("[handleTaskCompleted][任务完成][taskId={}, requestId={}]", taskId, requestId);
} catch (Exception e) {
log.error("[handleTaskCompleted][处理任务完成失败][taskId={}]", taskId, e);
}
}
/**
* 处理任务失败
*/
@Transactional(rollbackFor = Exception.class)
public void handleTaskFailed(Long taskId, String errorMessage, String requestId) {
try {
// 更新任务状态为失败
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
updateObj.setId(taskId);
updateObj.setStatus("FAILED");
updateObj.setErrorMessage(errorMessage);
updateObj.setFinishTime(LocalDateTime.now());
taskMapper.updateById(updateObj);
// 从轮询队列中移除
removeFromPollingQueue(taskId, requestId);
log.warn("[handleTaskFailed][任务失败][taskId={}, requestId={}, error={}]", taskId, requestId, errorMessage);
} catch (Exception e) {
log.error("[handleTaskFailed][处理任务失败失败][taskId={}]", taskId, e);
}
}
/**
* 标记任务失败(内部使用)
*/
private void markTaskFailed(Long taskId, String errorMessage) {
handleTaskFailed(taskId, errorMessage, null);
}
/**
* 从轮询队列中移除任务
*/
private void removeFromPollingQueue(Long taskId, String requestId) {
try {
if (StrUtil.isNotBlank(requestId)) {
// 移除具体任务
String taskKey = REDIS_POLLING_PREFIX + requestId;
stringRedisTemplate.delete(taskKey);
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
stringRedisTemplate.delete(countKey);
stringRedisTemplate.opsForZSet().remove(REDIS_POLLING_TASKS_SET, requestId);
} else {
// 尝试通过taskId找到requestId并移除
String pattern = REDIS_POLLING_PREFIX + "task_" + taskId;
// 这里可以优化为直接查询,但为了简单起见,先不实现
}
} catch (Exception e) {
log.error("[removeFromPollingQueue][移除任务失败][taskId={}, requestId={}]", taskId, requestId, e);
}
}
/**
* 获取待轮询的任务ID列表
*/
private List<String> getPendingPollingTasks() {
try {
// 获取所有待轮询的任务
// 注意这里返回的是requestId不是taskId
List<String> requestIds = stringRedisTemplate.opsForZSet()
.range(REDIS_POLLING_TASKS_SET, 0, -1)
.stream()
.toList();
if (requestIds.isEmpty()) {
return List.of();
}
// 将requestId转换为taskId
return requestIds.stream()
.map(requestId -> {
String taskKey = REDIS_POLLING_PREFIX + requestId;
return stringRedisTemplate.opsForValue().get(taskKey);
})
.filter(StrUtil::isNotBlank)
.toList();
} catch (Exception e) {
log.error("[getPendingPollingTasks][获取待轮询任务失败]", e);
return List.of();
}
}
/**
* 清理过期任务每天凌晨2点执行
*/
@Scheduled(cron = "0 0 2 * * ?")
public void cleanupExpiredTasks() {
try {
log.info("[cleanupExpiredTasks][开始清理过期轮询任务]");
// 清理过期的轮询记录
stringRedisTemplate.delete(REDIS_POLLING_TASKS_SET);
// 可以添加更多清理逻辑
log.info("[cleanupExpiredTasks][清理完成]");
} catch (Exception e) {
log.error("[cleanupExpiredTasks][清理过期任务异常]", e);
}
}
/**
* 保存视频到OSS - 流式处理优化内存
*/
private String saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception {
log.info("[saveVideoToOss][任务({})开始下载并保存视频到OSS][remoteUrl={}]", task.getId(), remoteVideoUrl);
try {
// 1. 下载远程视频文件流式处理避免OOM
byte[] videoBytes = downloadRemoteFile(remoteVideoUrl);
// 2. 内存检查超过50MB记录警告
int sizeMB = videoBytes.length / 1024 / 1024;
if (sizeMB > 50) {
log.warn("[saveVideoToOss][任务({})视频文件较大][size={}MB]", task.getId(), sizeMB);
}
// 3. 获取OSS目录路径
Long userId = task.getUserId();
String baseDirectory = ossInitService.getOssDirectoryByCategory(userId, "generate");
// 4. 生成文件名
String fileName = String.format("task_%d_%d.mp4", task.getId(), System.currentTimeMillis());
// 5. 保存到OSS
String ossUrl = fileApi.createFile(videoBytes, fileName, baseDirectory, "video/mp4");
// 6. 移除预签名URL中的签名参数获取基础URL
String cleanOssUrl = HttpUtils.removeUrlQuery(ossUrl);
log.info("[saveVideoToOss][任务({})视频保存到OSS完成][directory={}, fileName={}, ossUrl={}]",
task.getId(), baseDirectory, fileName, cleanOssUrl);
return cleanOssUrl;
} catch (Exception e) {
log.error("[saveVideoToOss][任务({})保存视频到OSS失败][remoteUrl={}]", task.getId(), remoteVideoUrl, e);
// 如果保存失败返回原始URL降级处理
return remoteVideoUrl;
}
}
/**
* 下载远程文件 - 内存优化
*/
private byte[] downloadRemoteFile(String remoteUrl) throws Exception {
log.info("[downloadRemoteFile][下载文件][url={}]", remoteUrl);
try (HttpResponse response = HttpRequest.get(remoteUrl)
.execute()) {
if (!response.isOk()) {
throw new Exception("下载文件失败: HTTP " + response.getStatus());
}
// 流式读取分块处理避免大文件OOM
byte[] bytes = response.bodyBytes();
int sizeMB = bytes.length / 1024 / 1024;
log.info("[downloadRemoteFile][文件下载完成][size={} bytes, {}MB]", bytes.length, sizeMB);
return bytes;
}
}
}

View File

@@ -32,4 +32,7 @@ public class AppTikDigitalHumanPageReqVO extends PageParam {
@Schema(description = "创建时间-结束", example = "2024-11-19 23:59:59")
private LocalDateTime createTimeEnd;
@Schema(description = "更新时间-开始(用于增量轮询)", example = "2024-11-19 12:00:00")
private LocalDateTime lastUpdateTime;
}