优化
This commit is contained in:
@@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.tik.file.dal.dataobject.TikUserOssInitDO;
|
||||
import cn.iocoder.yudao.module.tik.file.dal.mysql.TikUserOssInitMapper;
|
||||
import cn.iocoder.yudao.module.tik.file.vo.app.AppTikUserOssInitRespVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
@@ -45,35 +46,50 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
return BeanUtils.toBean(existing, AppTikUserOssInitRespVO.class);
|
||||
}
|
||||
|
||||
// 获取用户信息(获取手机号)
|
||||
MemberUserRespDTO user = memberUserApi.getUser(userId);
|
||||
if (user == null || StrUtil.isBlank(user.getMobile())) {
|
||||
throw exception(OSS_INIT_FAILED, "用户手机号不存在");
|
||||
}
|
||||
|
||||
// 计算手机号MD5和OSS路径
|
||||
// 获取用户信息(优先使用手机号MD5,否则使用userId)
|
||||
Long tenantId = TenantContextHolder.getRequiredTenantId();
|
||||
String mobileMd5 = DigestUtil.md5Hex(user.getMobile());
|
||||
OssPathInfo pathInfo = buildOssPaths(mobileMd5, tenantId);
|
||||
String pathIdentifier = getPathIdentifier(userId, tenantId);
|
||||
OssPathInfo pathInfo = buildOssPaths(pathIdentifier, tenantId);
|
||||
|
||||
// 创建OSS初始化记录
|
||||
TikUserOssInitDO ossInit = createOssInitDO(userId, pathIdentifier, pathInfo);
|
||||
|
||||
// 创建或更新OSS初始化记录
|
||||
// 注意:OSS中目录是虚拟的,不需要显式创建,直接上传文件时包含路径即可自动创建
|
||||
TikUserOssInitDO ossInit;
|
||||
if (existing != null) {
|
||||
// 更新现有记录(补充缺失的字段或重新初始化)
|
||||
ossInit = existing;
|
||||
updateOssInitFields(ossInit, mobileMd5, pathInfo);
|
||||
// 更新现有记录
|
||||
ossInit.setId(existing.getId());
|
||||
ossInitMapper.updateById(ossInit);
|
||||
} else {
|
||||
// 创建新记录
|
||||
ossInit = createOssInitDO(userId, mobileMd5, pathInfo);
|
||||
ossInitMapper.insert(ossInit);
|
||||
// 尝试插入,如果并发冲突则更新
|
||||
try {
|
||||
ossInitMapper.insert(ossInit);
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.info("[initOssDirectory][用户({})并发插入冲突,改为更新]", userId);
|
||||
existing = ossInitMapper.selectByUserId(userId);
|
||||
if (existing != null) {
|
||||
ossInit.setId(existing.getId());
|
||||
ossInitMapper.updateById(ossInit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[initOssDirectory][用户({})OSS初始化成功,根路径({})]", userId, pathInfo.ossRootPath);
|
||||
return BeanUtils.toBean(ossInit, AppTikUserOssInitRespVO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取路径标识符
|
||||
* 优先使用手机号MD5,否则使用userId
|
||||
*/
|
||||
private String getPathIdentifier(Long userId, Long tenantId) {
|
||||
MemberUserRespDTO user = memberUserApi.getUser(userId);
|
||||
if (user != null && StrUtil.isNotBlank(user.getMobile())) {
|
||||
return DigestUtil.md5Hex(user.getMobile());
|
||||
}
|
||||
// 无手机号时使用userId作为标识
|
||||
log.info("[getPathIdentifier][用户({})无手机号,使用userId作为路径标识]", userId);
|
||||
return "u" + userId;
|
||||
}
|
||||
|
||||
/**
|
||||
* OSS路径信息
|
||||
*/
|
||||
@@ -88,9 +104,12 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
|
||||
/**
|
||||
* 构建OSS路径信息
|
||||
*
|
||||
* @param pathIdentifier 路径标识符(手机号MD5或u{userId}格式)
|
||||
* @param tenantId 租户ID
|
||||
*/
|
||||
private OssPathInfo buildOssPaths(String mobileMd5, Long tenantId) {
|
||||
String ossRootPath = mobileMd5 + "/" + tenantId;
|
||||
private OssPathInfo buildOssPaths(String pathIdentifier, Long tenantId) {
|
||||
String ossRootPath = pathIdentifier + "/" + tenantId;
|
||||
return new OssPathInfo(
|
||||
ossRootPath,
|
||||
ossRootPath + "/video",
|
||||
@@ -103,25 +122,15 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
|
||||
/**
|
||||
* 创建OSS初始化DO对象
|
||||
*
|
||||
* @param userId 用户ID
|
||||
* @param pathIdentifier 路径标识符(手机号MD5或u{userId}格式)
|
||||
* @param pathInfo 路径信息
|
||||
*/
|
||||
private TikUserOssInitDO createOssInitDO(Long userId, String mobileMd5, OssPathInfo pathInfo) {
|
||||
private TikUserOssInitDO createOssInitDO(Long userId, String pathIdentifier, OssPathInfo pathInfo) {
|
||||
return new TikUserOssInitDO()
|
||||
.setUserId(userId)
|
||||
.setMobileMd5(mobileMd5)
|
||||
.setOssRootPath(pathInfo.ossRootPath)
|
||||
.setVideoPath(pathInfo.videoPath)
|
||||
.setGeneratePath(pathInfo.generatePath)
|
||||
.setAudioPath(pathInfo.audioPath)
|
||||
.setMixPath(pathInfo.mixPath)
|
||||
.setVoicePath(pathInfo.voicePath)
|
||||
.setInitStatus(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新OSS初始化DO对象的字段
|
||||
*/
|
||||
private void updateOssInitFields(TikUserOssInitDO ossInit, String mobileMd5, OssPathInfo pathInfo) {
|
||||
ossInit.setMobileMd5(mobileMd5)
|
||||
.setMobileMd5(pathIdentifier)
|
||||
.setOssRootPath(pathInfo.ossRootPath)
|
||||
.setVideoPath(pathInfo.videoPath)
|
||||
.setGeneratePath(pathInfo.generatePath)
|
||||
@@ -139,27 +148,27 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOssRootPath(Long userId) {
|
||||
/**
|
||||
* 获取已初始化的OSS记录,未初始化则抛出异常
|
||||
*/
|
||||
private TikUserOssInitDO getRequiredOssInit(Long userId) {
|
||||
TikUserOssInitDO ossInit = ossInitMapper.selectByUserId(userId);
|
||||
if (ossInit == null || ossInit.getInitStatus() == 0) {
|
||||
throw exception(OSS_INIT_FAILED);
|
||||
}
|
||||
return ossInit.getOssRootPath();
|
||||
return ossInit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOssRootPath(Long userId) {
|
||||
return getRequiredOssInit(userId).getOssRootPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOssDirectoryByCategory(Long userId, String fileCategory) {
|
||||
// 确保OSS已初始化
|
||||
ensureOssInitialized(userId);
|
||||
TikUserOssInitDO ossInit = getRequiredOssInit(userId);
|
||||
|
||||
// 获取OSS初始化记录
|
||||
TikUserOssInitDO ossInit = ossInitMapper.selectByUserId(userId);
|
||||
if (ossInit == null || ossInit.getInitStatus() == 0) {
|
||||
throw exception(OSS_INIT_FAILED);
|
||||
}
|
||||
|
||||
// 根据分类返回对应基础目录路径
|
||||
return switch (fileCategory) {
|
||||
case "video" -> ossInit.getVideoPath();
|
||||
case "generate" -> ossInit.getGeneratePath();
|
||||
@@ -173,7 +182,7 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
|
||||
/**
|
||||
* 基于分类和分组获取OSS目录路径
|
||||
* 新路径格式:/user-files/{category}/{date}/{groupName}/
|
||||
* 路径格式:{mobileMd5}/user-files/{category}/{date}/{groupName}
|
||||
*
|
||||
* @param userId 用户编号
|
||||
* @param category 分类:MIX 或 DIGITAL_HUMAN
|
||||
@@ -182,36 +191,18 @@ public class TikOssInitServiceImpl implements TikOssInitService {
|
||||
* @return OSS目录路径
|
||||
*/
|
||||
public String getOssDirectoryByCategoryAndGroup(Long userId, String category, String groupName, String dateStr) {
|
||||
// 确保OSS已初始化
|
||||
ensureOssInitialized(userId);
|
||||
TikUserOssInitDO ossInit = getRequiredOssInit(userId);
|
||||
|
||||
// 构建新格式的路径
|
||||
// 路径格式:{mobileMd5}/{tenantId}/user-files/{category}/{date}/{groupName}
|
||||
TikUserOssInitDO ossInit = ossInitMapper.selectByUserId(userId);
|
||||
if (ossInit == null || ossInit.getInitStatus() == 0) {
|
||||
throw exception(OSS_INIT_FAILED);
|
||||
String path = ossInit.getMobileMd5() + "/user-files/" + category.toLowerCase() + "/" + dateStr;
|
||||
|
||||
if (StrUtil.isNotBlank(groupName)) {
|
||||
// 对分组名进行URL安全处理:保留中文、字母、数字、下划线和连字符
|
||||
String safeGroupName = groupName.trim().replaceAll("[^a-zA-Z0-9一-鿿_-]", "_");
|
||||
path += "/" + safeGroupName;
|
||||
}
|
||||
|
||||
// 获取基础路径(去掉tenantId部分)
|
||||
String basePath = ossInit.getMobileMd5();
|
||||
|
||||
// 构建完整路径
|
||||
StringBuilder pathBuilder = new StringBuilder();
|
||||
pathBuilder.append(basePath)
|
||||
.append("/user-files/")
|
||||
.append(category.toLowerCase())
|
||||
.append("/")
|
||||
.append(dateStr);
|
||||
|
||||
// 如果有分组名,添加到路径
|
||||
if (groupName != null && !groupName.trim().isEmpty()) {
|
||||
// 对分组名进行URL安全处理
|
||||
String safeGroupName = groupName.trim()
|
||||
.replaceAll("[^a-zA-Z0-9一-鿿_-]", "_"); // 保留中文、字母、数字、下划线和连字符
|
||||
pathBuilder.append("/").append(safeGroupName);
|
||||
}
|
||||
|
||||
return pathBuilder.toString();
|
||||
return path;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,9 +24,9 @@ public class MixTaskConstants {
|
||||
|
||||
/**
|
||||
* 定时任务配置
|
||||
* 改为每30秒检查一次,提供更实时的进度更新
|
||||
* 每1分钟检查一次,平衡响应速度和系统压力
|
||||
*/
|
||||
public static final String CRON_CHECK_STATUS = "*/30 * * * * ?";
|
||||
public static final String CRON_CHECK_STATUS = "0 */1 * * * ?";
|
||||
|
||||
/**
|
||||
* 任务状态检查优化配置
|
||||
|
||||
@@ -3,8 +3,11 @@ package cn.iocoder.yudao.module.tik.mix.service;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.infra.service.file.FileService;
|
||||
import com.alibaba.ttl.TtlRunnable;
|
||||
import cn.iocoder.yudao.module.tik.mix.client.IceClient;
|
||||
import cn.iocoder.yudao.module.tik.mix.constants.MixTaskConstants;
|
||||
import cn.iocoder.yudao.module.tik.mix.dal.dataobject.MixTaskDO;
|
||||
@@ -65,15 +68,15 @@ public class MixTaskServiceImpl implements MixTaskService {
|
||||
// 2. 保存到数据库
|
||||
mixTaskMapper.insert(task);
|
||||
|
||||
// 3. 异步提交到阿里云 ICE
|
||||
CompletableFuture.runAsync(() -> {
|
||||
// 3. 异步提交到阿里云 ICE(使用 TTL 自动传递上下文)
|
||||
CompletableFuture.runAsync(TtlRunnable.get(() -> {
|
||||
try {
|
||||
submitToICE(task.getId(), createReqVO, userId);
|
||||
} catch (Exception e) {
|
||||
log.error("[MixTask][提交ICE失败] taskId={}", task.getId(), e);
|
||||
updateTaskError(task.getId(), "提交任务失败: " + e.getMessage());
|
||||
}
|
||||
});
|
||||
}));
|
||||
|
||||
return task.getId();
|
||||
}
|
||||
@@ -175,8 +178,8 @@ public class MixTaskServiceImpl implements MixTaskService {
|
||||
updateTask.setOutputUrlList(null);
|
||||
mixTaskMapper.updateById(updateTask);
|
||||
|
||||
// 3. 重新提交到ICE
|
||||
CompletableFuture.runAsync(() -> {
|
||||
// 3. 重新提交到ICE(使用 TTL 自动传递上下文)
|
||||
CompletableFuture.runAsync(TtlRunnable.get(() -> {
|
||||
try {
|
||||
// 从 materialsJson 重建请求对象
|
||||
List<MixTaskSaveReqVO.MaterialItem> materials = null;
|
||||
@@ -207,7 +210,7 @@ public class MixTaskServiceImpl implements MixTaskService {
|
||||
log.error("[MixTask][重新提交失败] taskId={}", id, e);
|
||||
updateTaskError(id, "重新提交失败: " + e.getMessage());
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -247,20 +250,22 @@ public class MixTaskServiceImpl implements MixTaskService {
|
||||
// 4. 频率优化:定时任务频率从30秒改为2分钟
|
||||
LocalDateTime startTime = LocalDateTime.now().minusHours(MixTaskConstants.CHECK_HOURS_LIMIT);
|
||||
|
||||
// 查询运行中的任务(限制时间和数量)
|
||||
List<MixTaskDO> runningTasks = mixTaskMapper.selectList(
|
||||
new cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX<MixTaskDO>()
|
||||
.eq(MixTaskDO::getStatus, MixTaskConstants.STATUS_RUNNING)
|
||||
.ge(MixTaskDO::getCreateTime, startTime)
|
||||
.orderByDesc(MixTaskDO::getCreateTime)
|
||||
.last("LIMIT " + MixTaskConstants.CHECK_BATCH_SIZE) // 限制数量
|
||||
// 查询运行中的任务(忽略租户过滤,因为定时任务没有租户上下文)
|
||||
List<MixTaskDO> runningTasks = TenantUtils.executeIgnore(() ->
|
||||
mixTaskMapper.selectList(
|
||||
new cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX<MixTaskDO>()
|
||||
.eq(MixTaskDO::getStatus, MixTaskConstants.STATUS_RUNNING)
|
||||
.ge(MixTaskDO::getCreateTime, startTime)
|
||||
.orderByDesc(MixTaskDO::getCreateTime)
|
||||
.last("LIMIT " + MixTaskConstants.CHECK_BATCH_SIZE)
|
||||
)
|
||||
);
|
||||
|
||||
if (runningTasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 逐个检查任务状态
|
||||
// 逐个检查任务状态(每个任务使用自己的租户上下文)
|
||||
int failureCount = 0;
|
||||
for (MixTaskDO task : runningTasks) {
|
||||
try {
|
||||
@@ -268,7 +273,8 @@ public class MixTaskServiceImpl implements MixTaskService {
|
||||
if (jobIds != null && !jobIds.isEmpty()) {
|
||||
// 每个任务可能有多个jobId,取第一个进行检查
|
||||
String jobId = jobIds.get(0);
|
||||
syncTaskStatus(task.getId(), jobId);
|
||||
// 使用任务的租户ID执行状态同步
|
||||
TenantUtils.execute(task.getTenantId(), () -> syncTaskStatus(task.getId(), jobId));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MixTask][单个任务检查失败] taskId={}", task.getId(), e);
|
||||
|
||||
Reference in New Issue
Block a user