feat: 功能优化

This commit is contained in:
2025-11-22 21:45:15 +08:00
parent 6b582ba6de
commit c35a2219c9
4 changed files with 167 additions and 136 deletions

View File

@@ -70,7 +70,6 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
* Redis缓存键前缀
*/
private static final String REDIS_TASK_RESULT_PREFIX = "digital_human:task:result:";
private static final String REDIS_TASK_STATUS_PREFIX = "digital_human:task:status:";
/**
* 缓存过期时间24小时
@@ -429,7 +428,7 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNTHESIZE_VOICE, "语音合成完成");
// 步骤3口型同步异步提交不等待完成
String syncedVideoUrl = syncLip(task, audioUrl);
syncLip(task, audioUrl);
updateTaskProgress(taskId, DigitalHumanTaskStepEnum.SYNC_LIP, "口型同步任务已提交,等待处理");
// Latentsync是异步处理这里不调用generateVideo
@@ -584,21 +583,6 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
return task.getVideoUrl();
}
/**
* 生成视频
*/
private String generateVideo(TikDigitalHumanTaskDO task, String syncedVideoUrl) throws Exception {
log.info("[generateVideo][任务({})开始生成视频]", task.getId());
// TODO: 这里可以添加视频后处理逻辑,比如添加字幕、特效等
// 保存同步后的视频到OSS
String resultVideoUrl = saveVideoToOss(task, syncedVideoUrl);
log.info("[generateVideo][任务({})视频生成完成]", task.getId());
return resultVideoUrl;
}
/**
* 更新任务状态
*/
@@ -635,9 +619,6 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
taskMapper.updateById(updateObj);
// 缓存状态(支持增量轮询)
cacheTaskStatus(taskId, status, progress);
log.info("[updateTaskStatus][任务({})状态更新: status={}, progress={}%]", taskId, status, progress);
}
@@ -667,70 +648,6 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
updateTaskStatus(taskId, "PROCESSING", step.getStep(), step.getProgress(), message, null);
}
/**
* 保存视频到OSS - 流式处理优化内存
*/
private String saveVideoToOss(TikDigitalHumanTaskDO task, String remoteVideoUrl) throws Exception {
log.info("[saveVideoToOss][任务({})开始下载并保存视频到OSS][remoteUrl={}]", task.getId(), remoteVideoUrl);
try {
// 1. 下载远程视频文件流式处理避免OOM
byte[] videoBytes = downloadRemoteFile(remoteVideoUrl);
// 2. 内存检查超过50MB记录警告
int sizeMB = videoBytes.length / 1024 / 1024;
if (sizeMB > 50) {
log.warn("[saveVideoToOss][任务({})视频文件较大][size={}MB]", task.getId(), sizeMB);
}
// 3. 获取OSS目录路径
Long userId = task.getUserId();
String baseDirectory = ossInitService.getOssDirectoryByCategory(userId, "generate");
// 4. 生成文件名
String fileName = String.format("task_%d_%d.mp4", task.getId(), System.currentTimeMillis());
// 5. 保存到OSS
String ossUrl = fileApi.createFile(videoBytes, fileName, baseDirectory, "video/mp4");
// 6. 移除预签名URL中的签名参数获取基础URL
String cleanOssUrl = HttpUtils.removeUrlQuery(ossUrl);
// 7. 缓存任务结果(快速回显)
cacheTaskResult(task.getId(), cleanOssUrl);
log.info("[saveVideoToOss][任务({})视频保存到OSS完成][directory={}, fileName={}, ossUrl={}]",
task.getId(), baseDirectory, fileName, cleanOssUrl);
return cleanOssUrl;
} catch (Exception e) {
log.error("[saveVideoToOss][任务({})保存视频到OSS失败][remoteUrl={}]", task.getId(), remoteVideoUrl, e);
// 如果保存失败返回原始URL降级处理
return remoteVideoUrl;
}
}
/**
* 下载远程文件 - 内存优化
*/
private byte[] downloadRemoteFile(String remoteUrl) throws Exception {
log.info("[downloadRemoteFile][下载文件][url={}]", remoteUrl);
try (HttpResponse response = HttpRequest.get(remoteUrl)
.execute()) {
if (!response.isOk()) {
throw new Exception("下载文件失败: HTTP " + response.getStatus());
}
// 流式读取分块处理避免大文件OOM
byte[] bytes = response.bodyBytes();
int sizeMB = bytes.length / 1024 / 1024;
log.info("[downloadRemoteFile][文件下载完成][size={} bytes, {}MB]", bytes.length, sizeMB);
return bytes;
}
}
/**
* 缓存任务结果 - 支持快速回显
*/
@@ -758,31 +675,4 @@ public class DigitalHumanTaskServiceImpl implements DigitalHumanTaskService {
}
}
/**
* 缓存任务状态 - 支持增量轮询
*/
private void cacheTaskStatus(Long taskId, String status, Integer progress) {
try {
String key = REDIS_TASK_STATUS_PREFIX + taskId;
String value = status + ":" + progress + ":" + System.currentTimeMillis();
stringRedisTemplate.opsForValue().set(key, value, CACHE_EXPIRE_TIME);
log.debug("[cacheTaskStatus][任务({})状态已缓存][status={}]", taskId, status);
} catch (Exception e) {
log.warn("[cacheTaskStatus][任务({})缓存状态失败]", taskId, e);
}
}
/**
* 获取缓存的任务状态
*/
private String getCachedTaskStatus(Long taskId) {
try {
String key = REDIS_TASK_STATUS_PREFIX + taskId;
return stringRedisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("[getCachedTaskStatus][任务({})获取缓存状态失败]", taskId, e);
return null;
}
}
}

View File

@@ -4,6 +4,8 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.framework.common.util.number.NumberUtils;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.tik.file.service.TikOssInitService;
import cn.iocoder.yudao.module.tik.voice.dal.dataobject.TikDigitalHumanTaskDO;
import cn.iocoder.yudao.module.tik.voice.dal.mysql.TikDigitalHumanTaskMapper;
@@ -24,12 +26,14 @@ import java.util.concurrent.TimeUnit;
/**
* Latentsync任务轮询服务 - 轻量化异步处理
* 使用@TenantIgnore忽略租户检查因为轮询服务没有用户上下文
*
* @author 芋道源码
*/
@Slf4j
@Service
@RequiredArgsConstructor
@TenantIgnore
public class LatentsyncPollingService {
private final TikDigitalHumanTaskMapper taskMapper;
@@ -126,10 +130,15 @@ public class LatentsyncPollingService {
* 单个任务轮询
*/
private void pollSingleTask(Long taskId) {
// 获取任务的requestId和轮询次数在try块外声明供catch块使用
String requestId = null;
String countKey = null;
int currentCount = 0;
try {
// 获取任务的requestId
String taskKey = REDIS_POLLING_PREFIX + "task_" + taskId;
String requestId = stringRedisTemplate.opsForValue().get(taskKey);
requestId = stringRedisTemplate.opsForValue().get(taskKey);
if (StrUtil.isBlank(requestId)) {
// 如果没有requestId说明任务可能已完成或已取消从轮询队列中移除
@@ -138,9 +147,9 @@ public class LatentsyncPollingService {
}
// 检查轮询次数
String countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
countKey = REDIS_POLLING_COUNT_PREFIX + requestId;
String countStr = stringRedisTemplate.opsForValue().get(countKey);
int currentCount = countStr != null ? Integer.parseInt(countStr) : 0;
currentCount = countStr != null ? Integer.parseInt(countStr) : 0;
if (currentCount >= MAX_POLLING_COUNT) {
// 超时,标记任务失败
@@ -169,7 +178,23 @@ public class LatentsyncPollingService {
}
} catch (Exception e) {
log.error("[pollSingleTask][轮询任务异常][taskId={}]", taskId, e);
// 轮询异常不直接标记失败,避免误判
// 轮询异常处理:增加轮询次数,避免无限重试
int errorCount = currentCount + 1;
if (errorCount >= MAX_POLLING_COUNT) {
// 达到最大次数,标记任务失败
log.warn("[pollSingleTask][任务轮询异常次数过多,标记失败][taskId={}, count={}]", taskId, errorCount);
if (requestId != null && StrUtil.isNotBlank(requestId)) {
markTaskFailed(taskId, "Latentsync API调用异常" + e.getMessage());
removeFromPollingQueue(taskId, requestId);
}
} else {
// 更新轮询次数,继续重试
if (countKey != null) {
stringRedisTemplate.opsForValue().set(countKey, String.valueOf(errorCount), CACHE_EXPIRE_TIME);
log.debug("[pollSingleTask][轮询异常,增加次数后继续重试][taskId={}, count={}]", taskId, errorCount);
}
}
}
}