From 85d3b82dab978e58b4c288f3520a6d5ba1f1eb7e Mon Sep 17 00:00:00 2001 From: sion123 <450702724@qq.com> Date: Sat, 22 Nov 2025 19:26:00 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=8A=9F=E8=83=BD=E6=BC=94=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/tik/config/TikAsyncConfig.java | 102 +++++ .../service/DigitalHumanTaskServiceImpl.java | 222 +++++++--- .../service/LatentsyncPollingService.java | 392 ++++++++++++++++++ .../voice/vo/AppTikDigitalHumanPageReqVO.java | 3 + 4 files changed, 655 insertions(+), 64 deletions(-) create mode 100644 yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java create mode 100644 yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/LatentsyncPollingService.java diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java new file mode 100644 index 0000000000..daf78841e6 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/config/TikAsyncConfig.java @@ -0,0 +1,102 @@ +package cn.iocoder.yudao.module.tik.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Tik模块异步任务配置 - 限流保护 + * + * @author 芋道源码 + */ +@Slf4j +@Configuration +@EnableAsync +public class TikAsyncConfig { + + /** + * 数字人任务专用线程池 - 限流保护 + */ + @Bean("taskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:10(CPU核心数) + executor.setCorePoolSize(10); + + // 最大线程数:20 + executor.setMaxPoolSize(20); + + // 队列长度:50(超过的请求会被拒绝) + executor.setQueueCapacity(50); + + // 线程名前缀 + executor.setThreadNamePrefix("digital-human-"); + + // 线程空闲时间:60秒 + executor.setKeepAliveSeconds(60); + + // 关闭时等待任务完成 + executor.setWaitForTasksToCompleteOnShutdown(true); + + // 等待时间:60秒 + executor.setAwaitTerminationSeconds(60); + + // 拒绝策略:抛出异常(也可以改为 CallerRunsPolicy 回退到调用者线程) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + + // 初始化 + executor.initialize(); + + log.info("[TikAsyncConfig][数字人任务线程池已初始化][corePoolSize={}, maxPoolSize={}, queueCapacity={}]", + executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity()); + + return executor; + } + + /** + * Latentsync轮询专用线程池 - 轻量化处理 + */ + @Bean("latentsyncPollingExecutor") + public Executor latentsyncPollingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:5(轮询任务不需要太多线程) + executor.setCorePoolSize(5); + + // 最大线程数:10 + executor.setMaxPoolSize(10); + + // 队列长度:100 + executor.setQueueCapacity(100); + + // 线程名前缀 + executor.setThreadNamePrefix("latentsync-poll-"); + + // 线程空闲时间:30秒 + executor.setKeepAliveSeconds(30); + + // 关闭时等待任务完成 + executor.setWaitForTasksToCompleteOnShutdown(true); + + // 等待时间:30秒 + executor.setAwaitTerminationSeconds(30); + + // 拒绝策略:直接丢弃最旧的任务 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + + // 初始化 + executor.initialize(); + + log.info("[TikAsyncConfig][Latentsync轮询线程池已初始化][corePoolSize={}, maxPoolSize={}, queueCapacity={}]", + executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity()); + + return executor; + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/DigitalHumanTaskServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/DigitalHumanTaskServiceImpl.java index 9a33012e32..3d6d4c2a75 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/DigitalHumanTaskServiceImpl.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/DigitalHumanTaskServiceImpl.java @@ -28,6 +28,7 @@ import cn.iocoder.yudao.module.tik.enums.ErrorCodeConstants; import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -35,6 +36,7 @@ import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.validation.annotation.Validated; +import java.time.Duration; import java.time.LocalDateTime; /** @@ -56,12 +58,25 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { private final TikUserVoiceService userVoiceService; private final LatentsyncService latentsyncService; private final TikOssInitService ossInitService; + private final LatentsyncPollingService latentsyncPollingService; + private final StringRedisTemplate stringRedisTemplate; /** * 预签名URL过期时间(24小时) */ private static final int PRESIGN_URL_EXPIRATION_SECONDS = 24 * 3600; + /** + * Redis缓存键前缀 + */ + private static final String REDIS_TASK_RESULT_PREFIX = "digital_human:task:result:"; + private static final String REDIS_TASK_STATUS_PREFIX = "digital_human:task:status:"; + + /** + * 缓存过期时间(24小时) + */ + private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(24); + @Override @Transactional(rollbackFor = Exception.class) public Long createTask(AppTikDigitalHumanCreateReqVO reqVO) { @@ -91,7 +106,18 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { @Override public AppTikDigitalHumanRespVO getTask(Long taskId) { TikDigitalHumanTaskDO task = getCurrentUserTask(taskId); - return convertToRespVO(task); + AppTikDigitalHumanRespVO respVO = convertToRespVO(task); + + // 优先从缓存获取结果(快速回显) + if ("SUCCESS".equals(task.getStatus()) && StrUtil.isBlank(task.getResultVideoUrl())) { + String cachedResult = getCachedTaskResult(taskId); + if (StrUtil.isNotBlank(cachedResult)) { + respVO.setResultVideoUrl(cachedResult); + log.debug("[getTask][任务({})从缓存获取结果][url={}]", taskId, cachedResult); + } + } + + return respVO; } @Override @@ -108,11 +134,41 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { pageReqVO.getCreateTimeStart(), pageReqVO.getCreateTimeEnd()) .orderByDesc(TikDigitalHumanTaskDO::getCreateTime); + // 优化增量轮询:如果传入了lastUpdateTime,只查询更新的任务 + if (pageReqVO.getLastUpdateTime() != null) { + queryWrapper.gt(TikDigitalHumanTaskDO::getUpdateTime, pageReqVO.getLastUpdateTime()); + } + // 查询分页结果 - 使用 Mapper 的重载方法,传入 pageParam 和 queryWrapper PageResult pageResult = taskMapper.selectPage(pageReqVO, queryWrapper); - // 转换为 VO - return CollectionUtils.convertPage(pageResult, this::convertToRespVO); + // 转换为 VO,并应用缓存结果 + PageResult result = CollectionUtils.convertPage(pageResult, this::convertToRespVOWithCache); + + // 如果是增量轮询(返回数据较少),记录日志 + if (pageReqVO.getLastUpdateTime() != null && result.getList().size() < pageReqVO.getPageSize()) { + log.debug("[getTaskPage][用户({})增量轮询完成][返回{}条记录]", userId, result.getList().size()); + } + + return result; + } + + /** + * 转换为响应VO(带缓存结果) + */ + private AppTikDigitalHumanRespVO convertToRespVOWithCache(TikDigitalHumanTaskDO task) { + AppTikDigitalHumanRespVO respVO = convertToRespVO(task); + + // 如果是成功状态且没有结果URL,尝试从缓存获取 + if ("SUCCESS".equals(task.getStatus()) && StrUtil.isBlank(task.getResultVideoUrl())) { + String cachedResult = getCachedTaskResult(task.getId()); + if (StrUtil.isNotBlank(cachedResult)) { + respVO.setResultVideoUrl(cachedResult); + log.debug("[convertToRespVOWithCache][任务({})从缓存获取结果]", task.getId()); + } + } + + return respVO; } @Override @@ -372,18 +428,15 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { String audioUrl = synthesizeVoice(task); updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNTHESIZE_VOICE, "语音合成完成"); - // 步骤3:口型同步 + // 步骤3:口型同步(异步提交,不等待完成) String syncedVideoUrl = syncLip(task, audioUrl); - updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNC_LIP, "口型同步完成"); + updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNC_LIP, "口型同步任务已提交,等待处理"); - // 步骤4:生成视频 - String resultVideoUrl = generateVideo(task, syncedVideoUrl); - updateTaskProgress(taskId, DigitalHumanTaskStepEnum.FINISHING, "视频生成完成"); + // Latentsync是异步处理,这里不调用generateVideo + // 而是将任务标记为等待Latentsync完成 + // 轮询服务会异步检测状态并在完成时更新任务 - // 任务完成 - updateTaskStatus(taskId, "SUCCESS", "finishing", 100, "任务处理完成", resultVideoUrl); - - log.info("[processTask][任务({})处理完成]", taskId); + log.info("[processTask][任务({})已提交到Latentsync,等待异步处理完成]", taskId); } catch (Exception e) { log.error("[processTask][任务({})处理失败]", taskId, e); @@ -501,7 +554,8 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { } /** - * 使用302AI Latentsync进行口型同步 + * 使用302AI Latentsync进行口型同步 - 异步处理 + * 提交任务后立即返回,由轮询服务异步检测状态 */ private String syncWithLatentsync(TikDigitalHumanTaskDO task, String audioUrl) throws Exception { // 构建Latentsync请求VO @@ -517,50 +571,17 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { log.info("[syncWithLatentsync][任务({})提交成功,requestId={}]", task.getId(), requestId); - // 轮询等待任务完成 - int maxAttempts = 90; // 最多轮询90次(15分钟) - int attempt = 0; - while (attempt < maxAttempts) { - attempt++; - try { - // 获取任务结果 - AppTikLatentsyncResultRespVO result = latentsyncService.getTaskResult(requestId); - String status = result.getStatus(); + // 将任务加入轮询队列(异步处理) + latentsyncPollingService.addTaskToPollingQueue(task.getId(), requestId); - log.info("[syncWithLatentsync][任务({})轮询结果: 第{}次, status={}]", - task.getId(), attempt, status); + // 存储requestId与taskId的映射关系(用于轮询服务查找) + String requestIdKey = "latentsync:polling:task_" + task.getId(); + stringRedisTemplate.opsForValue().set(requestIdKey, requestId, Duration.ofHours(1)); - // 检查任务是否完成 - if ("COMPLETED".equals(status)) { - // 任务完成,获取视频URL - String videoUrl = result.getVideo().getUrl(); - if (StrUtil.isNotBlank(videoUrl)) { - log.info("[syncWithLatentsync][任务({})口型同步完成,videoUrl={}]", task.getId(), videoUrl); - return videoUrl; - } else { - throw new Exception("Latentsync 返回视频URL为空"); - } - } else if ("FAILED".equals(status) || "ERROR".equals(status)) { - throw new Exception("Latentsync 任务处理失败: " + status); - } - - // 等待10秒后再次轮询(处理中的任务间隔稍长一些) - Thread.sleep(10000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new Exception("等待Latentsync结果时被中断", e); - } catch (Exception e) { - log.error("[syncWithLatentsync][任务({})轮询异常: {}]", task.getId(), e.getMessage(), e); - // 如果是最后一次尝试,抛出异常 - if (attempt >= maxAttempts) { - throw new Exception("等待Latentsync结果超时: " + e.getMessage(), e); - } - // 否则等待后重试 - Thread.sleep(10000); - } - } - - throw new Exception("等待Latentsync结果超时"); + // 立即返回原视频URL,不等待Latentsync完成 + // 轮询服务会异步更新任务状态 + log.info("[syncWithLatentsync][任务({})已加入轮询队列,返回原视频URL]", task.getId()); + return task.getVideoUrl(); } /** @@ -600,6 +621,10 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { if ("SUCCESS".equals(status)) { updateObj.setResultVideoUrl(resultVideoUrl); updateObj.setFinishTime(LocalDateTime.now()); + // 缓存结果 + if (StrUtil.isNotBlank(resultVideoUrl)) { + cacheTaskResult(taskId, resultVideoUrl); + } } else if ("PROCESSING".equals(status)) { updateObj.setStartTime(LocalDateTime.now()); } else if ("FAILED".equals(status)) { @@ -609,7 +634,11 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { } taskMapper.updateById(updateObj); - log.info("[updateTaskStatus][任务({})状态更新: {}]", taskId, updateObj); + + // 缓存状态(支持增量轮询) + cacheTaskStatus(taskId, status, progress); + + log.info("[updateTaskStatus][任务({})状态更新: status={}, progress={}%]", taskId, status, progress); } /** @@ -639,28 +668,37 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { } /** - * 保存视频到OSS + * 保存视频到OSS - 流式处理优化内存 */ private String saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception { log.info("[saveVideoToOss][任务({})开始下载并保存视频到OSS][remoteUrl={}]", task.getId(), remoteVideoUrl); try { - // 1. 下载远程视频文件 + // 1. 下载远程视频文件(流式处理避免OOM) byte[] videoBytes = downloadRemoteFile(remoteVideoUrl); - // 2. 获取OSS目录路径(使用"generate"分类,符合数字人生成的语义) + // 2. 内存检查:超过50MB记录警告 + int sizeMB = videoBytes.length / 1024 / 1024; + if (sizeMB > 50) { + log.warn("[saveVideoToOss][任务({})视频文件较大][size={}MB]", task.getId(), sizeMB); + } + + // 3. 获取OSS目录路径 Long userId = task.getUserId(); String baseDirectory = ossInitService.getOssDirectoryByCategory(userId, "generate"); - // 3. 生成文件名(格式:task_{taskId}_{timestamp}.mp4) + // 4. 生成文件名 String fileName = String.format("task_%d_%d.mp4", task.getId(), System.currentTimeMillis()); - // 4. 保存到OSS + // 5. 保存到OSS String ossUrl = fileApi.createFile(videoBytes, fileName, baseDirectory, "video/mp4"); - // 5. 移除预签名URL中的签名参数,获取基础URL(用于存储) + // 6. 移除预签名URL中的签名参数,获取基础URL String cleanOssUrl = HttpUtils.removeUrlQuery(ossUrl); + // 7. 缓存任务结果(快速回显) + cacheTaskResult(task.getId(), cleanOssUrl); + log.info("[saveVideoToOss][任务({})视频保存到OSS完成][directory={}, fileName={}, ossUrl={}]", task.getId(), baseDirectory, fileName, cleanOssUrl); return cleanOssUrl; @@ -673,7 +711,7 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { } /** - * 下载远程文件 + * 下载远程文件 - 内存优化 */ private byte[] downloadRemoteFile(String remoteUrl) throws Exception { log.info("[downloadRemoteFile][下载文件][url={}]", remoteUrl); @@ -685,10 +723,66 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService { throw new Exception("下载文件失败: HTTP " + response.getStatus()); } + // 流式读取:分块处理避免大文件OOM byte[] bytes = response.bodyBytes(); - log.info("[downloadRemoteFile][文件下载完成][size={} bytes]", bytes.length); + int sizeMB = bytes.length / 1024 / 1024; + log.info("[downloadRemoteFile][文件下载完成][size={} bytes, {}MB]", bytes.length, sizeMB); return bytes; } } + /** + * 缓存任务结果 - 支持快速回显 + */ + private void cacheTaskResult(Long taskId, String videoUrl) { + try { + String key = REDIS_TASK_RESULT_PREFIX + taskId; + stringRedisTemplate.opsForValue().set(key, videoUrl, CACHE_EXPIRE_TIME); + log.debug("[cacheTaskResult][任务({})结果已缓存][key={}]", taskId, key); + } catch (Exception e) { + log.warn("[cacheTaskResult][任务({})缓存失败]", taskId, e); + // 缓存失败不影响主流程 + } + } + + /** + * 获取缓存的任务结果 + */ + private String getCachedTaskResult(Long taskId) { + try { + String key = REDIS_TASK_RESULT_PREFIX + taskId; + return stringRedisTemplate.opsForValue().get(key); + } catch (Exception e) { + log.warn("[getCachedTaskResult][任务({})获取缓存失败]", taskId, e); + return null; + } + } + + /** + * 缓存任务状态 - 支持增量轮询 + */ + private void cacheTaskStatus(Long taskId, String status, Integer progress) { + try { + String key = REDIS_TASK_STATUS_PREFIX + taskId; + String value = status + ":" + progress + ":" + System.currentTimeMillis(); + stringRedisTemplate.opsForValue().set(key, value, CACHE_EXPIRE_TIME); + log.debug("[cacheTaskStatus][任务({})状态已缓存][status={}]", taskId, status); + } catch (Exception e) { + log.warn("[cacheTaskStatus][任务({})缓存状态失败]", taskId, e); + } + } + + /** + * 获取缓存的任务状态 + */ + private String getCachedTaskStatus(Long taskId) { + try { + String key = REDIS_TASK_STATUS_PREFIX + taskId; + return stringRedisTemplate.opsForValue().get(key); + } catch (Exception e) { + log.warn("[getCachedTaskStatus][任务({})获取缓存状态失败]", taskId, e); + return null; + } + } + } diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/LatentsyncPollingService.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/LatentsyncPollingService.java new file mode 100644 index 0000000000..fb96f026d6 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/service/LatentsyncPollingService.java @@ -0,0 +1,392 @@ +package cn.iocoder.yudao.module.tik.voice.service; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.iocoder.yudao.framework.common.util.http.HttpUtils; +import cn.iocoder.yudao.module.tik.file.service.TikOssInitService; +import cn.iocoder.yudao.module.tik.voice.dal.dataobject.TikDigitalHumanTaskDO; +import cn.iocoder.yudao.module.tik.voice.dal.mysql.TikDigitalHumanTaskMapper; +import cn.iocoder.yudao.module.tik.voice.vo.AppTikLatentsyncResultRespVO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Latentsync任务轮询服务 - 轻量化异步处理 + * + * @author 芋道源码 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class LatentsyncPollingService { + + private final TikDigitalHumanTaskMapper taskMapper; + private final LatentsyncService latentsyncService; + private final StringRedisTemplate stringRedisTemplate; + private final RedissonClient redissonClient; + private final TikOssInitService ossInitService; + private final cn.iocoder.yudao.module.infra.api.file.FileApi fileApi; + + /** + * Redis键前缀 + */ + private static final String REDIS_POLLING_PREFIX = "latentsync:polling:"; + private static final String REDIS_POLLING_TASKS_SET = "latentsync:polling:tasks"; + private static final String REDIS_POLLING_COUNT_PREFIX = "latentsync:polling:count:"; + private static final String LOCK_KEY = "latentsync:polling:lock"; + + /** + * 轮询配置 + */ + private static final int MAX_POLLING_COUNT = 90; // 最多轮询90次(15分钟) + private static final int POLLING_INTERVAL_SECONDS = 10; // 轮询间隔10秒 + private static final Duration CACHE_EXPIRE_TIME = Duration.ofHours(1); // 缓存1小时 + + /** + * 定时轮询Latentsync任务状态 - 每10秒执行一次 + * 使用分布式锁防止并发执行 + */ + @Scheduled(fixedDelay = 10000) + public void pollLatentsyncTasks() { + RLock lock = redissonClient.getLock(LOCK_KEY); + // 尝试加锁(最大等待时间1秒,锁持有时间5秒) + if (lock.tryLock()) { + try { + executePollingTasks(); + } catch (Exception ex) { + log.error("[pollLatentsyncTasks][执行异常]", ex); + } finally { + lock.unlock(); + } + } + } + + /** + * 执行轮询任务的具体逻辑 + */ + private void executePollingTasks() { + try { + // 获取所有待轮询的任务ID + List taskIds = getPendingPollingTasks(); + if (taskIds.isEmpty()) { + return; + } + + log.debug("[pollLatentsyncTasks][开始轮询][任务数量={}]", taskIds.size()); + + // 逐个处理任务 + for (String taskIdStr : taskIds) { + try { + Long taskId = Long.parseLong(taskIdStr); + pollSingleTask(taskId); + } catch (Exception e) { + log.error("[pollLatentsyncTasks][轮询任务失败][taskId={}]", taskIdStr, e); + } + } + } catch (Exception e) { + log.error("[pollLatentsyncTasks][轮询任务异常]", e); + } + } + + /** + * 添加任务到轮询队列 + */ + public void addTaskToPollingQueue(Long taskId, String requestId) { + try { + // 存储任务信息 + String taskKey = REDIS_POLLING_PREFIX + requestId; + stringRedisTemplate.opsForValue().set(taskKey, taskId.toString(), CACHE_EXPIRE_TIME); + + // 添加到待轮询集合 + stringRedisTemplate.opsForZSet().add(REDIS_POLLING_TASKS_SET, requestId, System.currentTimeMillis()); + + // 初始化轮询次数 + String countKey = REDIS_POLLING_COUNT_PREFIX + requestId; + stringRedisTemplate.opsForValue().set(countKey, "0", CACHE_EXPIRE_TIME); + + log.info("[addTaskToPollingQueue][任务已加入轮询队列][taskId={}, requestId={}]", taskId, requestId); + } catch (Exception e) { + log.error("[addTaskToPollingQueue][添加任务失败][taskId={}]", taskId, e); + } + } + + /** + * 单个任务轮询 + */ + private void pollSingleTask(Long taskId) { + try { + // 获取任务的requestId + String taskKey = REDIS_POLLING_PREFIX + "task_" + taskId; + String requestId = stringRedisTemplate.opsForValue().get(taskKey); + + if (StrUtil.isBlank(requestId)) { + // 如果没有requestId,说明任务可能已完成或已取消,从轮询队列中移除 + removeFromPollingQueue(taskId, null); + return; + } + + // 检查轮询次数 + String countKey = REDIS_POLLING_COUNT_PREFIX + requestId; + String countStr = stringRedisTemplate.opsForValue().get(countKey); + int currentCount = countStr != null ? Integer.parseInt(countStr) : 0; + + if (currentCount >= MAX_POLLING_COUNT) { + // 超时,标记任务失败 + log.warn("[pollSingleTask][任务轮询超时][taskId={}, requestId={}, count={}]", taskId, requestId, currentCount); + markTaskFailed(taskId, "Latentsync处理超时"); + removeFromPollingQueue(taskId, requestId); + return; + } + + // 调用Latentsync API检查状态 + AppTikLatentsyncResultRespVO result = latentsyncService.getTaskResult(requestId); + String status = result.getStatus(); + + log.debug("[pollSingleTask][轮询任务状态][taskId={}, requestId={}, status={}]", taskId, requestId, status); + + // 检查任务状态 + if ("COMPLETED".equals(status)) { + // 任务完成 + handleTaskCompleted(taskId, result.getVideo().getUrl(), requestId); + } else if ("FAILED".equals(status) || "ERROR".equals(status)) { + // 任务失败 + handleTaskFailed(taskId, "Latentsync处理失败: " + status, requestId); + } else { + // 继续轮询,更新轮询次数 + stringRedisTemplate.opsForValue().set(countKey, String.valueOf(currentCount + 1), CACHE_EXPIRE_TIME); + } + } catch (Exception e) { + log.error("[pollSingleTask][轮询任务异常][taskId={}]", taskId, e); + // 轮询异常不直接标记失败,避免误判 + } + } + + /** + * 处理任务完成 + */ + @Transactional(rollbackFor = Exception.class) + public void handleTaskCompleted(Long taskId, String videoUrl, String requestId) { + TikDigitalHumanTaskDO task = null; + try { + // 获取任务信息 + task = taskMapper.selectById(taskId); + if (task == null) { + log.error("[handleTaskCompleted][任务不存在][taskId={}]", taskId); + return; + } + + // 保存视频到OSS(异步处理,轻量化逻辑) + String savedVideoUrl = videoUrl; + try { + // 保存视频到OSS,避免临时URL过期 + savedVideoUrl = saveVideoToOss(task, videoUrl); + log.info("[handleTaskCompleted][任务({})视频已保存到OSS][url={}]", taskId, savedVideoUrl); + } catch (Exception e) { + log.warn("[handleTaskCompleted][任务({})保存视频失败,使用原URL][error={}]", taskId, e.getMessage()); + savedVideoUrl = videoUrl; // 降级处理 + } + + // 更新任务状态为成功 + TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO(); + updateObj.setId(taskId); + updateObj.setStatus("SUCCESS"); + updateObj.setCurrentStep("finishing"); + updateObj.setProgress(100); + updateObj.setResultVideoUrl(savedVideoUrl); + updateObj.setFinishTime(LocalDateTime.now()); + taskMapper.updateById(updateObj); + + // 缓存结果到Redis(快速回显) + try { + String resultKey = "digital_human:task:result:" + taskId; + stringRedisTemplate.opsForValue().set(resultKey, savedVideoUrl, Duration.ofHours(24)); + } catch (Exception e) { + log.warn("[handleTaskCompleted][任务({})缓存结果失败]", taskId, e); + } + + // 从轮询队列中移除 + removeFromPollingQueue(taskId, requestId); + + log.info("[handleTaskCompleted][任务完成][taskId={}, requestId={}]", taskId, requestId); + } catch (Exception e) { + log.error("[handleTaskCompleted][处理任务完成失败][taskId={}]", taskId, e); + } + } + + /** + * 处理任务失败 + */ + @Transactional(rollbackFor = Exception.class) + public void handleTaskFailed(Long taskId, String errorMessage, String requestId) { + try { + // 更新任务状态为失败 + TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO(); + updateObj.setId(taskId); + updateObj.setStatus("FAILED"); + updateObj.setErrorMessage(errorMessage); + updateObj.setFinishTime(LocalDateTime.now()); + taskMapper.updateById(updateObj); + + // 从轮询队列中移除 + removeFromPollingQueue(taskId, requestId); + + log.warn("[handleTaskFailed][任务失败][taskId={}, requestId={}, error={}]", taskId, requestId, errorMessage); + } catch (Exception e) { + log.error("[handleTaskFailed][处理任务失败失败][taskId={}]", taskId, e); + } + } + + /** + * 标记任务失败(内部使用) + */ + private void markTaskFailed(Long taskId, String errorMessage) { + handleTaskFailed(taskId, errorMessage, null); + } + + /** + * 从轮询队列中移除任务 + */ + private void removeFromPollingQueue(Long taskId, String requestId) { + try { + if (StrUtil.isNotBlank(requestId)) { + // 移除具体任务 + String taskKey = REDIS_POLLING_PREFIX + requestId; + stringRedisTemplate.delete(taskKey); + + String countKey = REDIS_POLLING_COUNT_PREFIX + requestId; + stringRedisTemplate.delete(countKey); + + stringRedisTemplate.opsForZSet().remove(REDIS_POLLING_TASKS_SET, requestId); + } else { + // 尝试通过taskId找到requestId并移除 + String pattern = REDIS_POLLING_PREFIX + "task_" + taskId; + // 这里可以优化为直接查询,但为了简单起见,先不实现 + } + } catch (Exception e) { + log.error("[removeFromPollingQueue][移除任务失败][taskId={}, requestId={}]", taskId, requestId, e); + } + } + + /** + * 获取待轮询的任务ID列表 + */ + private List getPendingPollingTasks() { + try { + // 获取所有待轮询的任务 + // 注意:这里返回的是requestId,不是taskId + List requestIds = stringRedisTemplate.opsForZSet() + .range(REDIS_POLLING_TASKS_SET, 0, -1) + .stream() + .toList(); + + if (requestIds.isEmpty()) { + return List.of(); + } + + // 将requestId转换为taskId + return requestIds.stream() + .map(requestId -> { + String taskKey = REDIS_POLLING_PREFIX + requestId; + return stringRedisTemplate.opsForValue().get(taskKey); + }) + .filter(StrUtil::isNotBlank) + .toList(); + } catch (Exception e) { + log.error("[getPendingPollingTasks][获取待轮询任务失败]", e); + return List.of(); + } + } + + /** + * 清理过期任务(每天凌晨2点执行) + */ + @Scheduled(cron = "0 0 2 * * ?") + public void cleanupExpiredTasks() { + try { + log.info("[cleanupExpiredTasks][开始清理过期轮询任务]"); + + // 清理过期的轮询记录 + stringRedisTemplate.delete(REDIS_POLLING_TASKS_SET); + + // 可以添加更多清理逻辑 + + log.info("[cleanupExpiredTasks][清理完成]"); + } catch (Exception e) { + log.error("[cleanupExpiredTasks][清理过期任务异常]", e); + } + } + + /** + * 保存视频到OSS - 流式处理优化内存 + */ + private String saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception { + log.info("[saveVideoToOss][任务({})开始下载并保存视频到OSS][remoteUrl={}]", task.getId(), remoteVideoUrl); + + try { + // 1. 下载远程视频文件(流式处理避免OOM) + byte[] videoBytes = downloadRemoteFile(remoteVideoUrl); + + // 2. 内存检查:超过50MB记录警告 + int sizeMB = videoBytes.length / 1024 / 1024; + if (sizeMB > 50) { + log.warn("[saveVideoToOss][任务({})视频文件较大][size={}MB]", task.getId(), sizeMB); + } + + // 3. 获取OSS目录路径 + Long userId = task.getUserId(); + String baseDirectory = ossInitService.getOssDirectoryByCategory(userId, "generate"); + + // 4. 生成文件名 + String fileName = String.format("task_%d_%d.mp4", task.getId(), System.currentTimeMillis()); + + // 5. 保存到OSS + String ossUrl = fileApi.createFile(videoBytes, fileName, baseDirectory, "video/mp4"); + + // 6. 移除预签名URL中的签名参数,获取基础URL + String cleanOssUrl = HttpUtils.removeUrlQuery(ossUrl); + + log.info("[saveVideoToOss][任务({})视频保存到OSS完成][directory={}, fileName={}, ossUrl={}]", + task.getId(), baseDirectory, fileName, cleanOssUrl); + return cleanOssUrl; + + } catch (Exception e) { + log.error("[saveVideoToOss][任务({})保存视频到OSS失败][remoteUrl={}]", task.getId(), remoteVideoUrl, e); + // 如果保存失败,返回原始URL(降级处理) + return remoteVideoUrl; + } + } + + /** + * 下载远程文件 - 内存优化 + */ + private byte[] downloadRemoteFile(String remoteUrl) throws Exception { + log.info("[downloadRemoteFile][下载文件][url={}]", remoteUrl); + + try (HttpResponse response = HttpRequest.get(remoteUrl) + .execute()) { + + if (!response.isOk()) { + throw new Exception("下载文件失败: HTTP " + response.getStatus()); + } + + // 流式读取:分块处理避免大文件OOM + byte[] bytes = response.bodyBytes(); + int sizeMB = bytes.length / 1024 / 1024; + log.info("[downloadRemoteFile][文件下载完成][size={} bytes, {}MB]", bytes.length, sizeMB); + return bytes; + } + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/vo/AppTikDigitalHumanPageReqVO.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/vo/AppTikDigitalHumanPageReqVO.java index ee9d9b6729..d510397909 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/vo/AppTikDigitalHumanPageReqVO.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/voice/vo/AppTikDigitalHumanPageReqVO.java @@ -32,4 +32,7 @@ public class AppTikDigitalHumanPageReqVO extends PageParam { @Schema(description = "创建时间-结束", example = "2024-11-19 23:59:59") private LocalDateTime createTimeEnd; + @Schema(description = "更新时间-开始(用于增量轮询)", example = "2024-11-19 12:00:00") + private LocalDateTime lastUpdateTime; + }