feat: 功能

This commit is contained in:
2026-02-04 01:18:16 +08:00
parent f8e40c039d
commit 0e1b6fe643
19 changed files with 1472 additions and 1008 deletions

View File

@@ -445,12 +445,12 @@ public class TikUserFileServiceImpl implements TikUserFileService {
}
@Override
public String getVideoPlayUrl(Long infraFileId) {
public String getVideoPlayUrl(Long fileId) {
Long userId = SecurityFrameworkUtils.getLoginUserId();
// 查询文件(根据 infraFileId 字段查询)
// 查询文件(根据 fileId 字段查询)
TikUserFileDO file = userFileMapper.selectOne(new LambdaQueryWrapperX<TikUserFileDO>()
.eq(TikUserFileDO::getFileId, infraFileId)
.eq(TikUserFileDO::getFileId, fileId)
.eq(TikUserFileDO::getUserId, userId));
if (file == null) {
@@ -473,12 +473,12 @@ public class TikUserFileServiceImpl implements TikUserFileService {
}
@Override
public String getAudioPlayUrl(Long infraFileId) {
public String getAudioPlayUrl(Long fileId) {
Long userId = SecurityFrameworkUtils.getLoginUserId();
// 查询文件(根据 infraFileId 字段查询)
// 查询文件(根据 fileId 字段查询)
TikUserFileDO file = userFileMapper.selectOne(new LambdaQueryWrapperX<TikUserFileDO>()
.eq(TikUserFileDO::getFileId, infraFileId)
.eq(TikUserFileDO::getFileId, fileId)
.eq(TikUserFileDO::getUserId, userId));
if (file == null) {

View File

@@ -14,9 +14,12 @@ import java.time.LocalDateTime;
@Data
public class AppTikUserFileRespVO {
@Schema(description = "文件编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@Schema(description = "文件编号(主键)", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
private Long id;
@Schema(description = "素材文件编号(关联 infra_file.id用于获取播放URL", example = "100")
private Long fileId;
@Schema(description = "文件名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "test.mp4")
private String fileName;

View File

@@ -52,34 +52,12 @@ public class KlingClient {
validateRequest(request);
Map<String, Object> payload = buildPayload(request);
try {
String body = objectMapper.writeValueAsString(payload);
String url = properties.getBaseUrl() + "/klingai/v1/videos/identify-face";
String url = properties.getBaseUrl() + "/klingai/v1/videos/identify-face";
Request httpRequest = buildPostRequest(url, payload);
Request httpRequest = new Request.Builder()
.url(url)
.addHeader("Authorization", "Bearer " + properties.getApiKey())
.addHeader("Content-Type", "application/json")
.post(RequestBody.create(body.getBytes(StandardCharsets.UTF_8), JSON))
.build();
try {
KlingIdentifyFaceResponse response = executeRequest(httpRequest, "identify-face", KlingIdentifyFaceResponse.class);
// 验证sessionId
if (StrUtil.isBlank(response.getData() == null ? null : response.getData().getSessionId())) {
throw exception0(LATENTSYNC_SUBMIT_FAILED.getCode(), "可灵返回 sessionId 为空");
}
return response;
} catch (ServiceException ex) {
throw ex;
} catch (Exception ex) {
log.error("[Kling][identify-face exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
} catch (Exception ex) {
log.error("[Kling][build request exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
KlingIdentifyFaceResponse response = executeRequest(httpRequest, "identify-face", KlingIdentifyFaceResponse.class);
validateSessionId(response.getData() != null ? response.getData().getSessionId() : null, "sessionId");
return response;
}
/**
@@ -89,35 +67,13 @@ public class KlingClient {
validateEnabled();
validateLipSyncRequest(request);
try {
String body = objectMapper.writeValueAsString(request);
log.info("[Kling][create-lip-sync请求体] {}", body);
String url = properties.getBaseUrl() + "/klingai/v1/videos/advanced-lip-sync";
String url = properties.getBaseUrl() + "/klingai/v1/videos/advanced-lip-sync";
Request httpRequest = buildPostRequest(url, request);
log.info("[Kling][create-lip-sync请求体] {}", request);
Request httpRequest = new Request.Builder()
.url(url)
.addHeader("Authorization", "Bearer " + properties.getApiKey())
.addHeader("Content-Type", "application/json")
.post(RequestBody.create(body.getBytes(StandardCharsets.UTF_8), JSON))
.build();
try {
KlingLipSyncCreateResponse response = executeRequest(httpRequest, "create-lip-sync", KlingLipSyncCreateResponse.class);
// 验证taskId
if (StrUtil.isBlank(response.getData() == null ? null : response.getData().getTaskId())) {
throw exception0(LATENTSYNC_SUBMIT_FAILED.getCode(), "可灵返回 taskId 为空");
}
return response;
} catch (ServiceException ex) {
throw ex;
} catch (Exception ex) {
log.error("[Kling][create-lip-sync exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
} catch (Exception ex) {
log.error("[Kling][build request exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
KlingLipSyncCreateResponse response = executeRequest(httpRequest, "create-lip-sync", KlingLipSyncCreateResponse.class);
validateSessionId(response.getData() != null ? response.getData().getTaskId() : null, "taskId");
return response;
}
/**
@@ -129,29 +85,10 @@ public class KlingClient {
throw exception0(LATENTSYNC_SUBMIT_FAILED.getCode(), "任务ID不能为空");
}
try {
String url = properties.getBaseUrl() + "/klingai/v1/videos/advanced-lip-sync/" + taskId;
String url = properties.getBaseUrl() + "/klingai/v1/videos/advanced-lip-sync/" + taskId;
Request httpRequest = buildGetRequest(url);
Request httpRequest = new Request.Builder()
.url(url)
.addHeader("Authorization", "Bearer " + properties.getApiKey())
.addHeader("Content-Type", "application/json")
.get()
.build();
try {
KlingLipSyncQueryResponse response = executeRequest(httpRequest, "get-lip-sync", KlingLipSyncQueryResponse.class);
return response;
} catch (ServiceException ex) {
throw ex;
} catch (Exception ex) {
log.error("[Kling][get-lip-sync exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
} catch (Exception ex) {
log.error("[Kling][build request exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
return executeRequest(httpRequest, "get-lip-sync", KlingLipSyncQueryResponse.class);
}
private void validateEnabled() {
@@ -272,10 +209,39 @@ public class KlingClient {
throw buildException(responseBody);
}
}
// 解析响应
T result = objectMapper.readValue(responseBody, responseClass);
// ✅ 检查业务错误码(可灵 API 可能返回 HTTP 200 但 code !== 0
try {
JsonNode root = objectMapper.readTree(responseBody);
if (root.has("code")) {
int code = root.get("code").asInt();
if (code != 0) {
String message = root.has("message") ? root.get("message").asText() :
root.has("detail") ? root.get("detail").asText() : "未知错误";
String requestId = root.has("request_id") ? root.get("request_id").asText() : "unknown";
log.error("[Kling][{} business error] code={}, message={}, request_id={}", operation, code, message, requestId);
throw exception0(LATENTSYNC_SUBMIT_FAILED.getCode(),
String.format("[%s] %s (code: %d, request_id: %s)", operation, message, code, requestId));
}
}
} catch (ServiceException ex) {
throw ex;
} catch (Exception ex) {
log.warn("[Kling][{} check business code failed, continuing]", operation, ex);
}
log.info("[Kling][{} success][responseBody={}]", operation, responseBody);
return objectMapper.readValue(responseBody, responseClass);
return result;
} catch (Exception ex) {
if (ex instanceof ServiceException) {
throw (ServiceException) ex;
}
log.error("[Kling][{} exception]", operation, ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
@@ -319,4 +285,43 @@ public class KlingClient {
return exception0(LATENTSYNC_SUBMIT_FAILED.getCode(), body);
}
}
/**
* 构建 POST 请求
*/
private Request buildPostRequest(String url, Object payload) {
try {
String body = objectMapper.writeValueAsString(payload);
return new Request.Builder()
.url(url)
.addHeader("Authorization", "Bearer " + properties.getApiKey())
.addHeader("Content-Type", "application/json")
.post(RequestBody.create(body.getBytes(StandardCharsets.UTF_8), JSON))
.build();
} catch (Exception ex) {
log.error("[Kling][build POST request exception]", ex);
throw exception(LATENTSYNC_SUBMIT_FAILED);
}
}
/**
* 构建 GET 请求
*/
private Request buildGetRequest(String url) {
return new Request.Builder()
.url(url)
.addHeader("Authorization", "Bearer " + properties.getApiKey())
.addHeader("Content-Type", "application/json")
.get()
.build();
}
/**
* 验证响应中的 sessionId 不为空
*/
private void validateSessionId(String sessionId, String fieldName) {
if (StrUtil.isBlank(sessionId)) {
throw exception0(LATENTSYNC_SUBMIT_FAILED.getCode(), "可灵返回 " + fieldName + " 为空");
}
}
}

View File

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.tik.voice.dal.dataobject;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.*;
@@ -163,6 +164,7 @@ public class TikDigitalHumanTaskDO extends TenantBaseDO {
/**
* 可灵口型同步任务ID从advanced-lip-sync接口获取
*/
@TableField("kling_task_id")
private String klingTaskId;
}

View File

@@ -4,9 +4,11 @@ import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.tik.voice.dal.dataobject.TikDigitalHumanTaskDO;
import cn.iocoder.yudao.module.tik.voice.vo.AppTikDigitalHumanPageReqVO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@@ -62,4 +64,20 @@ public interface TikDigitalHumanTaskMapper extends BaseMapperX<TikDigitalHumanTa
.last("LIMIT " + limit));
}
/**
* 查询待轮询的可灵任务状态为PROCESSING且有klingTaskId最近6小时内
* 使用条件构造器,配合服务类 @TenantIgnore 注解忽略租户限制
*/
@TenantIgnore
default List<TikDigitalHumanTaskDO> selectPendingKlingTasks() {
return selectList(new LambdaQueryWrapperX<TikDigitalHumanTaskDO>()
.eq(TikDigitalHumanTaskDO::getStatus, "PROCESSING")
.eq(TikDigitalHumanTaskDO::getAiProvider, "kling")
.isNotNull(TikDigitalHumanTaskDO::getKlingTaskId)
.ne(TikDigitalHumanTaskDO::getKlingTaskId, "")
.apply("create_time >= DATE_SUB(NOW(), INTERVAL 6 HOUR)")
.orderByDesc(TikDigitalHumanTaskDO::getCreateTime)
.last("LIMIT 50"));
}
}

View File

@@ -23,11 +23,11 @@ public class DigitalHumanTaskStatusSyncJob {
*/
@Scheduled(fixedDelay = 10000)
public void syncTaskStatus() {
log.debug("开始同步数字人任务状态");
log.info("[DigitalHumanTaskStatusSyncJob][开始同步数字人任务状态]");
try {
latentsyncPollingService.pollLatentsyncTasks();
} catch (Exception e) {
log.error("同步数字人任务状态失败", e);
log.error("[DigitalHumanTaskStatusSyncJob][同步数字人任务状态失败]", e);
}
}

View File

@@ -82,11 +82,14 @@ public class LatentsyncPollingService {
* 执行轮询任务的具体逻辑
*/
private void executePollingTasks() {
log.info("[executePollingTasks][开始执行轮询任务]");
try {
// 轮询Latentsync任务
List<String> taskIds = getPendingPollingTasks();
log.info("[executePollingTasks][获取到Latentsync任务数量={}]", taskIds.size());
if (!taskIds.isEmpty()) {
log.debug("[pollLatentsyncTasks][开始轮询Latentsync任务][任务数量={}]", taskIds.size());
log.info("[pollLatentsyncTasks][开始轮询Latentsync任务][任务数量={}]", taskIds.size());
// 逐个处理Latentsync任务
for (String taskIdStr : taskIds) {
@@ -100,7 +103,9 @@ public class LatentsyncPollingService {
}
// 轮询可灵任务
log.info("[executePollingTasks][准备轮询可灵任务]");
pollKlingTasks();
log.info("[executePollingTasks][可灵任务轮询完成]");
} catch (Exception e) {
log.error("[pollLatentsyncTasks][轮询任务异常]", e);
@@ -536,28 +541,19 @@ public class LatentsyncPollingService {
*/
private void pollKlingTasks() {
try {
// 参考混剪任务实现:添加时间和数量限制,避免并发问题
// 1. 时间范围限制只检查最近6小时内的任务避免检查历史任务
// 2. 数量限制每次最多检查50个任务避免单次查询过多
LocalDateTime startTime = LocalDateTime.now().minusHours(6);
log.info("[pollKlingTasks][开始查询待轮询的可灵任务]");
// 查询待轮询的可灵任务状态为PROCESSING且有klingTaskId限制时间和数量
List<TikDigitalHumanTaskDO> klingTasks = taskMapper.selectList(
new cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX<TikDigitalHumanTaskDO>()
.eq(TikDigitalHumanTaskDO::getStatus, "PROCESSING")
.eq(TikDigitalHumanTaskDO::getAiProvider, "kling")
.isNotNull(TikDigitalHumanTaskDO::getKlingTaskId)
.ne(TikDigitalHumanTaskDO::getKlingTaskId, "")
.ge(TikDigitalHumanTaskDO::getCreateTime, startTime) // 只检查最近6小时
.orderByDesc(TikDigitalHumanTaskDO::getCreateTime)
.last("LIMIT 50") // 限制数量,避免并发
// 显式忽略租户限制查询待轮询的可灵任务
List<TikDigitalHumanTaskDO> klingTasks = cn.iocoder.yudao.framework.tenant.core.util.TenantUtils.executeIgnore(
() -> taskMapper.selectPendingKlingTasks()
);
if (klingTasks.isEmpty()) {
log.info("[pollKlingTasks][没有待轮询的可灵任务]");
return;
}
log.debug("[pollKlingTasks][开始轮询可灵任务][任务数量={}]", klingTasks.size());
log.info("[pollKlingTasks][开始轮询可灵任务][任务数量={}]", klingTasks.size());
// 逐个处理可灵任务
for (TikDigitalHumanTaskDO task : klingTasks) {
@@ -589,53 +585,12 @@ public class LatentsyncPollingService {
String taskStatus = response.getData().getTaskStatus();
String taskStatusMsg = response.getData().getTaskStatusMsg();
log.debug("[pollKlingSingleTask][任务({})状态更新][klingTaskId={}, status={}]",
task.getId(), klingTaskId, taskStatus);
log.info("[pollKlingSingleTask][任务({})状态更新][klingTaskId={}, status={}, msg={}]",
task.getId(), klingTaskId, taskStatus, taskStatusMsg);
// 根据状态更新任务
if ("succeed".equalsIgnoreCase(taskStatus)) {
// 任务成功完成
List<KlingLipSyncVideoVO> videos = response.getData().getTaskResult().getVideos();
if (videos != null && !videos.isEmpty()) {
String videoUrl = videos.get(0).getUrl();
// 保存视频到OSS异步处理轻量化逻辑
OssSaveResult saveResult = null;
try {
// 保存视频到OSS避免临时URL过期
saveResult = saveVideoToOss(task, videoUrl);
log.info("[pollKlingSingleTask][任务({})视频已保存到OSS][url={}]", task.getId(), saveResult.getUrl());
} catch (Exception e) {
log.warn("[pollKlingSingleTask][任务({})保存视频失败使用原URL][error={}]", task.getId(), e.getMessage());
saveResult = new OssSaveResult(videoUrl, 0, null, null); // 降级处理
}
// 更新任务状态为成功
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
updateObj.setId(task.getId());
updateObj.setStatus("SUCCESS");
updateObj.setCurrentStep("finishing");
updateObj.setProgress(100);
updateObj.setResultVideoUrl(saveResult.getUrl());
updateObj.setFinishTime(LocalDateTime.now());
taskMapper.updateById(updateObj);
// 缓存结果到Redis快速回显
try {
String resultKey = "digital_human:task:result:" + task.getId();
stringRedisTemplate.opsForValue().set(resultKey, saveResult.getUrl(), Duration.ofHours(24));
} catch (Exception e) {
log.warn("[pollKlingSingleTask][任务({})缓存结果失败]", task.getId(), e);
}
// 保存结果视频到用户文件表
saveResultVideoToUserFiles(task, saveResult);
log.info("[pollKlingSingleTask][任务({})完成][videoUrl={}]", task.getId(), saveResult.getUrl());
} else {
log.warn("[pollKlingSingleTask][任务({})成功但无视频结果]", task.getId());
}
handleKlingTaskSucceed(task, response);
} else if ("failed".equalsIgnoreCase(taskStatus)) {
// 任务失败
String errorMsg = "可灵任务执行失败: " + (StrUtil.isNotBlank(taskStatusMsg) ? taskStatusMsg : "未知错误");
@@ -645,7 +600,7 @@ public class LatentsyncPollingService {
} else if ("submitted".equalsIgnoreCase(taskStatus) || "processing".equalsIgnoreCase(taskStatus)) {
// 任务还在处理中,更新进度
updateTaskStatus(task.getId(), "PROCESSING", "sync_lip", 70, "口型同步处理中", null);
log.debug("[pollKlingSingleTask][任务({})处理中]", task.getId());
log.info("[pollKlingSingleTask][任务({})处理中][klingTaskId={}, status={}]", task.getId(), klingTaskId, taskStatus);
} else {
log.warn("[pollKlingSingleTask][任务({})未知状态][status={}]", task.getId(), taskStatus);
@@ -687,7 +642,64 @@ public class LatentsyncPollingService {
updateObj.setFinishTime(LocalDateTime.now());
}
// 显式忽略租户限制执行更新操作
cn.iocoder.yudao.framework.tenant.core.util.TenantUtils.executeIgnore(
() -> taskMapper.updateById(updateObj)
);
}
/**
* 处理可灵任务成功
*/
private void handleKlingTaskSucceed(TikDigitalHumanTaskDO task, KlingLipSyncQueryResponse response) {
List<KlingLipSyncVideoVO> videos = response.getData().getTaskResult().getVideos();
if (videos == null || videos.isEmpty()) {
log.warn("[pollKlingSingleTask][任务({})成功但无视频结果]", task.getId());
return;
}
String videoUrl = videos.get(0).getUrl();
OssSaveResult saveResult;
try {
saveResult = saveVideoToOss(task, videoUrl);
log.info("[pollKlingSingleTask][任务({})视频已保存到OSS][url={}]", task.getId(), saveResult.getUrl());
} catch (Exception e) {
log.warn("[pollKlingSingleTask][任务({})保存视频失败使用原URL][error={}]", task.getId(), e.getMessage());
saveResult = new OssSaveResult(videoUrl, 0, null, null);
}
updateTaskSuccess(task.getId(), saveResult.getUrl());
cacheTaskResult(task.getId(), saveResult.getUrl());
saveResultVideoToUserFiles(task, saveResult);
log.info("[pollKlingSingleTask][任务({})完成][videoUrl={}]", task.getId(), saveResult.getUrl());
}
/**
* 更新任务为成功状态
*/
private void updateTaskSuccess(Long taskId, String videoUrl) {
TikDigitalHumanTaskDO updateObj = new TikDigitalHumanTaskDO();
updateObj.setId(taskId);
updateObj.setStatus("SUCCESS");
updateObj.setCurrentStep("finishing");
updateObj.setProgress(100);
updateObj.setResultVideoUrl(videoUrl);
updateObj.setFinishTime(LocalDateTime.now());
taskMapper.updateById(updateObj);
}
/**
* 缓存任务结果到 Redis
*/
private void cacheTaskResult(Long taskId, String url) {
try {
String resultKey = "digital_human:task:result:" + taskId;
stringRedisTemplate.opsForValue().set(resultKey, url, Duration.ofHours(24));
} catch (Exception e) {
log.warn("[cacheTaskResult][任务({})缓存结果失败]", taskId, e);
}
}
}