diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java new file mode 100644 index 0000000000..03e6167ce5 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java @@ -0,0 +1,140 @@ +package cn.iocoder.yudao.module.tik.dify.client; + +import cn.iocoder.yudao.module.tik.dify.config.DifyProperties; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Dify API 客户端 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class DifyClient { + + @Resource + private DifyProperties difyProperties; + + /** + * 调用 Dify 工作流流式 API + * + * @param apiKey Dify API Key + * @param content 用户输入 + * @param systemPrompt 系统提示词 + * @param conversationId 会话ID(可选) + * @return 流式响应 + */ + public Flux chatStream(String apiKey, String content, String systemPrompt, String conversationId) { + String apiUrl = difyProperties.getApiUrl() + "/v1/workflows/run"; + + // 构建请求体 + Map inputs = new HashMap<>(); + inputs.put("sysPrompt", systemPrompt); + inputs.put("userInput", content); + + Map requestBody = new HashMap<>(); + requestBody.put("inputs", inputs); + requestBody.put("response_mode", "streaming"); + requestBody.put("user", "user-" + System.currentTimeMillis()); + + AtomicReference responseConversationId = new AtomicReference<>(conversationId); + StringBuilder fullContent = new StringBuilder(); + + WebClient webClient = WebClient.builder() + .baseUrl(apiUrl) + .defaultHeader("Authorization", "Bearer " + apiKey) + .defaultHeader("Content-Type", "application/json") + .build(); + + return webClient.post() + .bodyValue(requestBody) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(String.class) + .map(this::parseSSEEvent) + .filter(resp -> resp != null) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + responseConversationId.set(resp.getConversationId()); + } + if (resp.getContent() != null) { + fullContent.append(resp.getContent()); + } + }) + .doOnComplete(() -> { + log.info("[chatStream] Dify 流式响应完成,会话ID: {}, 内容长度: {}", + responseConversationId.get(), fullContent.length()); + }) + .doOnError(e -> log.error("[chatStream] Dify 流式响应错误", e)); + } + + /** + * 解析 SSE 事件 + */ + private DifyChatRespVO parseSSEEvent(String event) { + if (event == null || event.isEmpty()) { + return null; + } + + try { + // 解析 SSE 事件格式 + // data: {"event": "message", "answer": "xxx", "conversation_id": "xxx"} + if (event.startsWith("data:")) { + String jsonStr = event.substring(5).trim(); + if (jsonStr.isEmpty() || jsonStr.equals("[DONE]")) { + return null; + } + + // 简单解析 JSON(实际项目建议使用 Jackson/Gson) + String eventType = extractJsonValue(jsonStr, "event"); + String answer = extractJsonValue(jsonStr, "answer"); + String conversationId = extractJsonValue(jsonStr, "conversation_id"); + + if ("message".equals(eventType) || "agent_message".equals(eventType)) { + return DifyChatRespVO.message(answer, conversationId); + } else if ("workflow_finished".equals(eventType) || "message_end".equals(eventType)) { + return DifyChatRespVO.done(conversationId, null); + } else if ("error".equals(eventType)) { + return DifyChatRespVO.error(answer); + } + } + } catch (Exception e) { + log.warn("[parseSSEEvent] 解析 SSE 事件失败: {}", event, e); + } + return null; + } + + /** + * 简单提取 JSON 值 + */ + private String extractJsonValue(String json, String key) { + String pattern = "\"" + key + "\"\\s*:\\s*\""; + int start = json.indexOf(pattern); + if (start == -1) { + // 尝试非字符串格式 + pattern = "\"" + key + "\"\\s*:\\s*"; + start = json.indexOf(pattern); + if (start == -1) return null; + start += pattern.length(); + int end = json.indexOf(",", start); + if (end == -1) end = json.indexOf("}", start); + if (end == -1) return null; + return json.substring(start, end).trim(); + } + start += pattern.length(); + int end = json.indexOf("\"", start); + if (end == -1) return null; + return json.substring(start, end); + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/config/DifyProperties.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/config/DifyProperties.java new file mode 100644 index 0000000000..07d975fd82 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/config/DifyProperties.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.module.tik.dify.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * Dify 配置属性 + * + * @author 芋道源码 + */ +@Data +@Component +@ConfigurationProperties(prefix = "yudao.dify") +public class DifyProperties { + + /** + * Dify API 地址 + */ + private String apiUrl; + + /** + * 请求超时时间(秒) + */ + private Integer timeout = 60; + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/controller/AppDifyController.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/controller/AppDifyController.java new file mode 100644 index 0000000000..22f4ee7994 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/controller/AppDifyController.java @@ -0,0 +1,42 @@ +package cn.iocoder.yudao.module.tik.dify.controller; + +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; +import cn.iocoder.yudao.module.tik.dify.service.DifyService; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.validation.Valid; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; + +/** + * Dify 工作流控制器 + * + * @author 芋道源码 + */ +@Tag(name = "用户 App - Dify 工作流") +@RestController +@RequestMapping("/api/tik/dify") +public class AppDifyController { + + @Resource + private DifyService difyService; + + @PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @Operation(summary = "流式聊天") + public Flux> chatStream(@Valid @RequestBody DifyChatReqVO reqVO) { + Long loginUserId = SecurityFrameworkUtils.getLoginUserId(); + String userId = loginUserId != null ? loginUserId.toString() : "1"; // 默认用户ID + + return difyService.chatStream(reqVO, userId) + .map(CommonResult::success); + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyService.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyService.java new file mode 100644 index 0000000000..900e292bae --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyService.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.module.tik.dify.service; + +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO; +import reactor.core.publisher.Flux; + +/** + * Dify 服务接口 + * + * @author 芋道源码 + */ +public interface DifyService { + + /** + * 流式聊天(带积分扣减) + * + * @param reqVO 请求参数 + * @param userId 用户ID + * @return 流式响应 + */ + Flux chatStream(DifyChatReqVO reqVO, String userId); + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java new file mode 100644 index 0000000000..e6b26c6ca8 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java @@ -0,0 +1,139 @@ +package cn.iocoder.yudao.module.tik.dify.service; + +import cn.iocoder.yudao.module.tik.dify.client.DifyClient; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO; +import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO; +import cn.iocoder.yudao.module.tik.muye.aiagent.dal.AiAgentDO; +import cn.iocoder.yudao.module.tik.muye.aiagent.service.AiAgentService; +import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO; +import cn.iocoder.yudao.module.tik.muye.points.service.PointsService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.validation.annotation.Validated; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Dify 服务实现类 + * + * @author 芋道源码 + */ +@Service +@Validated +@Slf4j +public class DifyServiceImpl implements DifyService { + + /** Dify 平台标识 */ + private static final String PLATFORM_DIFY = "dify"; + /** Dify 模型类型 */ + private static final String MODEL_TYPE_WRITING = "writing"; + + @Resource + private AiAgentService aiAgentService; + + @Resource + private PointsService pointsService; + + @Resource + private DifyClient difyClient; + + @Override + public Flux chatStream(DifyChatReqVO reqVO, String userId) { + // 用于存储预扣记录ID + AtomicLong pendingRecordId = new AtomicLong(); + // 用于存储会话ID + AtomicReference conversationIdRef = new AtomicReference<>(reqVO.getConversationId()); + + return Mono.fromCallable(() -> { + // 1. 获取智能体配置 + AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId()); + if (agent == null) { + throw new RuntimeException("智能体不存在"); + } + + // 2. 获取积分配置 + AiModelConfigDO config = pointsService.getConfig(PLATFORM_DIFY, MODEL_TYPE_WRITING); + + // 3. 预检积分 + pointsService.checkPoints(userId, config.getConsumePoints()); + + // 4. 创建预扣记录 + Long recordId = pointsService.createPendingDeduct( + userId, + config.getConsumePoints(), + "dify_chat", + reqVO.getAgentId().toString() + ); + pendingRecordId.set(recordId); + + // 5. 返回调用参数 + return new DifyChatContext(agent.getSystemPrompt(), config.getApiKey(), config.getConsumePoints()); + }) + .flatMapMany(context -> { + // 6. 调用 Dify 流式 API + return difyClient.chatStream( + context.apiKey(), + reqVO.getContent(), + context.systemPrompt(), + reqVO.getConversationId() + ) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + conversationIdRef.set(resp.getConversationId()); + } + }) + // 7. 流结束时确认扣费 + .doOnComplete(() -> { + if (pendingRecordId.get() > 0) { + try { + pointsService.confirmPendingDeduct(pendingRecordId.get()); + log.info("[chatStream] 流结束,确认扣费,记录ID: {}", pendingRecordId.get()); + } catch (Exception e) { + log.error("[chatStream] 确认扣费失败", e); + } + } + }) + // 8. 流出错时取消预扣 + .doOnError(e -> { + if (pendingRecordId.get() > 0) { + try { + pointsService.cancelPendingDeduct(pendingRecordId.get()); + log.info("[chatStream] 流出错,取消预扣,记录ID: {}", pendingRecordId.get()); + } catch (Exception ex) { + log.error("[chatStream] 取消预扣失败", ex); + } + } + }) + // 9. 用户取消时确认扣费(已消费的部分) + .doOnCancel(() -> { + if (pendingRecordId.get() > 0) { + try { + // 用户主动取消,仍然扣费(按最低消费) + pointsService.confirmPendingDeduct(pendingRecordId.get()); + log.info("[chatStream] 用户取消,确认扣费,记录ID: {}", pendingRecordId.get()); + } catch (Exception e) { + log.error("[chatStream] 用户取消后扣费失败", e); + } + } + }); + }) + // 10. 在最后添加 done 事件 + .concatWith(Mono.defer(() -> { + return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null)); + })) + .onErrorResume(e -> { + log.error("[chatStream] Dify 聊天异常", e); + return Flux.just(DifyChatRespVO.error(e.getMessage())); + }); + } + + /** + * Dify 聊天上下文 + */ + private record DifyChatContext(String systemPrompt, String apiKey, Integer consumePoints) {} + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatReqVO.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatReqVO.java new file mode 100644 index 0000000000..1ad6eb551b --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatReqVO.java @@ -0,0 +1,26 @@ +package cn.iocoder.yudao.module.tik.dify.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * Dify 聊天请求 VO + */ +@Schema(description = "Dify 聊天请求") +@Data +public class DifyChatReqVO { + + @Schema(description = "智能体ID", requiredMode = Schema.RequiredMode.REQUIRED) + @NotNull(message = "智能体ID不能为空") + private Long agentId; + + @Schema(description = "用户输入内容", requiredMode = Schema.RequiredMode.REQUIRED) + @NotEmpty(message = "内容不能为空") + private String content; + + @Schema(description = "会话ID(可选,首次对话不传)") + private String conversationId; + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatRespVO.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatRespVO.java new file mode 100644 index 0000000000..3e9a6da2b9 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/vo/DifyChatRespVO.java @@ -0,0 +1,62 @@ +package cn.iocoder.yudao.module.tik.dify.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Dify 聊天响应 VO + */ +@Schema(description = "Dify 聊天响应") +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DifyChatRespVO { + + @Schema(description = "事件类型:message/done/error") + private String event; + + @Schema(description = "消息内容") + private String content; + + @Schema(description = "会话ID") + private String conversationId; + + @Schema(description = "消耗积分") + private Integer consumePoints; + + @Schema(description = "错误信息") + private String errorMessage; + + /** 事件类型常量 */ + public static final String EVENT_MESSAGE = "message"; + public static final String EVENT_DONE = "done"; + public static final String EVENT_ERROR = "error"; + + public static DifyChatRespVO message(String content, String conversationId) { + return DifyChatRespVO.builder() + .event(EVENT_MESSAGE) + .content(content) + .conversationId(conversationId) + .build(); + } + + public static DifyChatRespVO done(String conversationId, Integer consumePoints) { + return DifyChatRespVO.builder() + .event(EVENT_DONE) + .conversationId(conversationId) + .consumePoints(consumePoints) + .build(); + } + + public static DifyChatRespVO error(String errorMessage) { + return DifyChatRespVO.builder() + .event(EVENT_ERROR) + .errorMessage(errorMessage) + .build(); + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/job/PointsPendingCleanJob.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/job/PointsPendingCleanJob.java new file mode 100644 index 0000000000..c32e41d406 --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/job/PointsPendingCleanJob.java @@ -0,0 +1,38 @@ +package cn.iocoder.yudao.module.tik.muye.points.job; + +import cn.iocoder.yudao.module.tik.muye.pointrecord.mapper.PointRecordMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * 预扣过期清理定时任务 + * + * 功能:清理超过30分钟的预扣记录,将其状态更新为已取消 + * 执行频率:每5分钟执行一次 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class PointsPendingCleanJob { + + @Resource + private PointRecordMapper pointRecordMapper; + + /** + * 每5分钟清理一次过期的预扣记录 + */ + @Scheduled(fixedRate = 5 * 60 * 1000) + public void cleanExpiredPendingRecords() { + log.info("[cleanExpiredPendingRecords][开始清理过期预扣记录]"); + try { + int affectedRows = pointRecordMapper.cancelExpiredPendingRecords(); + log.info("[cleanExpiredPendingRecords][清理完成,共取消 {} 条过期预扣记录]", affectedRows); + } catch (Exception e) { + log.error("[cleanExpiredPendingRecords][清理过期预扣记录失败]", e); + } + } + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsService.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsService.java new file mode 100644 index 0000000000..61ebb3250d --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsService.java @@ -0,0 +1,69 @@ +package cn.iocoder.yudao.module.tik.muye.points.service; + +import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO; + +/** + * 积分扣减公共服务 + * + * @author 芋道源码 + */ +public interface PointsService { + + /** + * 获取积分配置 + * + * @param platform 平台标识(dify/tikhub/voice/digital_human) + * @param modelType 模型类型 + * @return 积分配置 + */ + AiModelConfigDO getConfig(String platform, String modelType); + + /** + * 预检积分(余额不足抛异常) + * + * @param userId 用户ID + * @param points 所需积分 + */ + void checkPoints(String userId, Integer points); + + /** + * 即时扣减(同步场景) + * 直接扣减积分并记录流水 + * + * @param userId 用户ID + * @param points 扣减积分数量 + * @param bizType 业务类型 + * @param bizId 业务关联ID + * @return 记录ID + */ + Long deductPoints(String userId, Integer points, String bizType, String bizId); + + /** + * 创建预扣(流式/异步场景) + * 创建待确认的扣减记录,不实际扣减积分 + * + * @param userId 用户ID + * @param points 预扣积分数量 + * @param bizType 业务类型 + * @param bizId 业务关联ID + * @return 预扣记录ID + */ + Long createPendingDeduct(String userId, Integer points, String bizType, String bizId); + + /** + * 确认预扣(实际扣减) + * 执行实际积分扣减,更新预扣记录状态 + * + * @param recordId 预扣记录ID + */ + void confirmPendingDeduct(Long recordId); + + /** + * 取消预扣(不扣费) + * 更新预扣记录状态为已取消 + * + * @param recordId 预扣记录ID + */ + void cancelPendingDeduct(Long recordId); + +} diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsServiceImpl.java new file mode 100644 index 0000000000..eaedcf8cfe --- /dev/null +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/points/service/PointsServiceImpl.java @@ -0,0 +1,179 @@ +package cn.iocoder.yudao.module.tik.muye.points.service; + +import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO; +import cn.iocoder.yudao.module.tik.muye.aimodelconfig.mapper.AiModelConfigMapper; +import cn.iocoder.yudao.module.tik.muye.memberuserprofile.dal.MemberUserProfileDO; +import cn.iocoder.yudao.module.tik.muye.memberuserprofile.mapper.MemberUserProfileMapper; +import cn.iocoder.yudao.module.tik.muye.pointrecord.dal.PointRecordDO; +import cn.iocoder.yudao.module.tik.muye.pointrecord.mapper.PointRecordMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.validation.annotation.Validated; + +import java.util.UUID; + +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.tik.enums.ErrorCodeConstants.*; + +/** + * 积分扣减公共服务 实现类 + * + * @author 芋道源码 + */ +@Service +@Validated +@Slf4j +public class PointsServiceImpl implements PointsService { + + /** 预扣状态:待确认 */ + private static final String STATUS_PENDING = "pending"; + /** 预扣状态:已确认 */ + private static final String STATUS_CONFIRMED = "confirmed"; + /** 预扣状态:已取消 */ + private static final String STATUS_CANCELED = "canceled"; + + @Resource + private AiModelConfigMapper aiModelConfigMapper; + + @Resource + private MemberUserProfileMapper memberUserProfileMapper; + + @Resource + private PointRecordMapper pointRecordMapper; + + @Override + public AiModelConfigDO getConfig(String platform, String modelType) { + AiModelConfigDO config = aiModelConfigMapper.selectByPlatformAndModelType(platform, modelType); + if (config == null) { + throw exception(POINTS_CONFIG_NOT_FOUND); + } + return config; + } + + @Override + public void checkPoints(String userId, Integer points) { + MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId); + if (profile == null || profile.getRemainingPoints() == null || profile.getRemainingPoints() < points) { + throw exception(POINTS_INSUFFICIENT); + } + } + + @Override + @Transactional(rollbackFor = Exception.class) + public Long deductPoints(String userId, Integer points, String bizType, String bizId) { + // 1. 原子扣减积分 + int affectedRows = memberUserProfileMapper.updatePointsDeduct(userId, points); + if (affectedRows == 0) { + throw exception(POINTS_DEDUCT_FAILED); + } + + // 2. 查询扣减后余额 + MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId); + + // 3. 创建积分记录(已确认状态) + PointRecordDO record = PointRecordDO.builder() + .userId(Long.parseLong(userId)) + .type("decrease") + .pointAmount(-points) + .balance(profile.getRemainingPoints()) + .reason(bizType) + .bizType(bizType) + .bizId(bizId) + .status(STATUS_CONFIRMED) + .build(); + pointRecordMapper.insert(record); + + log.info("[deductPoints] 用户 {} 扣减积分 {},业务类型 {},记录ID {}", userId, points, bizType, record.getId()); + return record.getId(); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public Long createPendingDeduct(String userId, Integer points, String bizType, String bizId) { + // 1. 预检积分 + checkPoints(userId, points); + + // 2. 查询当前余额 + MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId); + + // 3. 创建预扣记录(待确认状态) + PointRecordDO record = PointRecordDO.builder() + .userId(Long.parseLong(userId)) + .type("decrease") + .pointAmount(-points) + .balance(profile.getRemainingPoints()) + .reason(bizType + "(预扣)") + .bizType(bizType) + .bizId(bizId != null ? bizId : UUID.randomUUID().toString()) + .status(STATUS_PENDING) + .build(); + pointRecordMapper.insert(record); + + log.info("[createPendingDeduct] 用户 {} 创建预扣 {} 积分,业务类型 {},记录ID {}", + userId, points, bizType, record.getId()); + return record.getId(); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void confirmPendingDeduct(Long recordId) { + // 1. 查询预扣记录 + PointRecordDO record = pointRecordMapper.selectById(recordId); + if (record == null) { + throw exception(POINTS_PENDING_NOT_FOUND); + } + + // 2. 校验状态 + if (!STATUS_PENDING.equals(record.getStatus())) { + throw exception(POINTS_PENDING_ALREADY_CONFIRMED); + } + + // 3. 获取扣减信息 + String userId = record.getUserId().toString(); + Integer points = Math.abs(record.getPointAmount()); + + // 4. 原子扣减积分 + int affectedRows = memberUserProfileMapper.updatePointsDeduct(userId, points); + if (affectedRows == 0) { + log.warn("[confirmPendingDeduct] 积分扣减失败,可能余额不足,记录ID {}", recordId); + throw exception(POINTS_DEDUCT_FAILED); + } + + // 5. 查询扣减后余额 + MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId); + + // 6. 更新预扣记录状态 + record.setStatus(STATUS_CONFIRMED); + record.setBalance(profile.getRemainingPoints()); + record.setReason(record.getReason().replace("(预扣)", "")); + pointRecordMapper.updateById(record); + + log.info("[confirmPendingDeduct] 确认预扣记录 {},用户 {} 扣减 {} 积分", + recordId, userId, points); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void cancelPendingDeduct(Long recordId) { + // 1. 查询预扣记录 + PointRecordDO record = pointRecordMapper.selectById(recordId); + if (record == null) { + throw exception(POINTS_PENDING_NOT_FOUND); + } + + // 2. 校验状态 + if (!STATUS_PENDING.equals(record.getStatus())) { + throw exception(POINTS_PENDING_ALREADY_CONFIRMED); + } + + // 3. 更新为已取消状态 + record.setStatus(STATUS_CANCELED); + pointRecordMapper.updateById(record); + + log.info("[cancelPendingDeduct] 取消预扣记录 {},用户 {},积分 {}", + recordId, record.getUserId(), Math.abs(record.getPointAmount())); + } + +} diff --git a/yudao-module-tik/src/main/resources/IMPLEMENTATION_PLAN.md b/yudao-module-tik/src/main/resources/IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000000..3a03893496 --- /dev/null +++ b/yudao-module-tik/src/main/resources/IMPLEMENTATION_PLAN.md @@ -0,0 +1,299 @@ +# AI 服务积分扣减公共服务 - 实现计划 + +> 版本: v1.0 +> 日期: 2026-02-22 +> 基于设计文档: points-service-integration.md + +--- + +## 一、实现概览 + +### 当前状态分析 + +| 组件 | 文件路径 | 当前状态 | 需要修改 | +|------|---------|---------|---------| +| PointRecordDO | `muye/pointrecord/dal/PointRecordDO.java` | 完整 | 新增 status 字段 | +| MemberUserProfileMapper | `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` | 仅查询 | 新增原子扣减方法 | +| AiModelConfigMapper | `muye/aimodelconfig/mapper/AiModelConfigMapper.java` | 仅分页 | 新增按平台查询方法 | +| PointsService | 不存在 | **需新建** | 新建接口+实现 | +| DifyService | 不存在 | **需新建** | 新建服务 | + +--- + +## 二、实现步骤 + +### 步骤 1: 数据库层修改 + +#### 1.1 PointRecordDO 新增 status 字段 + +**文件**: `muye/pointrecord/dal/PointRecordDO.java` + +```java +// 新增字段 +private String status; // 状态:pending(预扣) / confirmed(已确认) / canceled(已取消) +``` + +**数据库迁移**: +```sql +ALTER TABLE muey_point_record ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消'; +``` + +#### 1.2 MemberUserProfileMapper 新增方法 + +**文件**: `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` + +```java +/** + * 根据用户ID查询档案 + */ +default MemberUserProfileDO selectByUserId(Long userId) { + return selectOne(new LambdaQueryWrapperX() + .eq(MemberUserProfileDO::getUserId, userId)); +} + +/** + * 原子扣减积分(乐观锁) + * @return 影响行数,0表示余额不足 + */ +@Update("UPDATE muey_member_user_profile " + + "SET remaining_points = remaining_points - #{points}, " + + " used_points = used_points + #{points}, " + + " update_time = NOW() " + + "WHERE user_id = #{userId} AND remaining_points >= #{points}") +int updatePointsDeduct(@Param("userId") Long userId, @Param("points") Integer points); +``` + +#### 1.3 AiModelConfigMapper 新增方法 + +**文件**: `muye/aimodelconfig/mapper/AiModelConfigMapper.java` + +```java +/** + * 根据平台和模型类型查询配置 + */ +default AiModelConfigDO selectByPlatformAndModelType(String platform, String modelType) { + return selectOne(new LambdaQueryWrapperX() + .eq(AiModelConfigDO::getPlatform, platform) + .eq(AiModelConfigDO::getModelType, modelType) + .eq(AiModelConfigDO::getStatus, 1)); +} +``` + +#### 1.4 PointRecordMapper 新增方法 + +**文件**: `muye/pointrecord/mapper/PointRecordMapper.java` + +```java +/** + * 取消过期的预扣记录(30分钟前) + */ +@Update("UPDATE muey_point_record SET status = 'canceled', update_time = NOW() " + + "WHERE status = 'pending' AND create_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)") +int cancelExpiredPendingRecords(); +``` + +--- + +### 步骤 2: 公共积分服务 + +#### 2.1 新建 PointsService 接口 + +**文件**: `muye/points/service/PointsService.java`(新建) + +```java +public interface PointsService { + + /** + * 获取积分配置 + */ + AiModelConfigDO getConfig(String platform, String modelType); + + /** + * 预检积分(余额不足抛异常) + */ + void checkPoints(Long userId, Integer points); + + /** + * 即时扣减(同步场景) + */ + Long deductPoints(Long userId, Integer points, String bizType, String bizId); + + /** + * 创建预扣(流式/异步场景) + * @return 预扣记录ID + */ + Long createPendingDeduct(Long userId, Integer points, String bizType, String bizId); + + /** + * 确认预扣(实际扣减) + */ + void confirmPendingDeduct(Long recordId); + + /** + * 取消预扣(不扣费) + */ + void cancelPendingDeduct(Long recordId); +} +``` + +#### 2.2 新建 PointsServiceImpl 实现 + +**文件**: `muye/points/service/PointsServiceImpl.java`(新建) + +核心逻辑: +- `deductPoints`: 调用 Mapper 原子扣减,失败抛 `POINTS_DEDUCT_FAILED` +- `createPendingDeduct`: 创建 status=pending 的记录 +- `confirmPendingDeduct`: 执行实际扣减 + 更新 status=confirmed +- `cancelPendingDeduct`: 更新 status=canceled + +#### 2.3 新建错误码常量 + +**文件**: `ErrorCodeConstants.java`(修改) + +```java +// 积分相关错误码 1001001-1001003 +ErrorCode POINTS_INSUFFICIENT = new ErrorCode(1001001, "积分不足"); +ErrorCode POINTS_CONFIG_NOT_FOUND = new ErrorCode(1001002, "积分配置不存在"); +ErrorCode POINTS_DEDUCT_FAILED = new ErrorCode(1001003, "积分扣减失败"); +``` + +--- + +### 步骤 3: Dify 工作流集成 + +#### 3.1 新建 Dify 配置类 + +**文件**: `dify/config/DifyProperties.java`(新建) + +```java +@ConfigurationProperties(prefix = "yudao.dify") +public class DifyProperties { + private String apiUrl; + private Integer timeout = 60; +} +``` + +#### 3.2 新建 DifyClient + +**文件**: `dify/client/DifyClient.java`(新建) + +- 调用 Dify 工作流 API +- 支持流式响应 +- 传入 sysPrompt 参数 + +#### 3.3 新建 DifyService + +**文件**: `dify/service/DifyService.java`(新建) + +```java +public interface DifyService { + /** + * 流式聊天(带积分扣减) + */ + Flux chatStream(DifyChatReqVO reqVO, Long userId); +} +``` + +#### 3.4 新建 DifyController + +**文件**: `dify/controller/AppDifyController.java`(新建) + +```java +@PostMapping("/api/tik/dify/chat/stream") +public Flux> chatStream(@RequestBody DifyChatReqVO reqVO); +``` + +--- + +### 步骤 4: 集成到现有服务 + +#### 4.1 AiChatMessageService 集成 + +**文件**: `service/chat/AiChatMessageServiceImpl.java`(修改) + +在 `sendChatMessageStream` 方法中: +1. 调用前:`pointsService.checkPoints()` +2. 创建预扣:`pointsService.createPendingDeduct()` +3. 流结束:`pointsService.confirmPendingDeduct()` +4. 出错/取消:`pointsService.cancelPendingDeduct()` + +--- + +### 步骤 5: 定时任务 + +#### 5.1 预扣过期清理任务 + +**文件**: `job/PointsPendingCleanJob.java`(新建) + +- 每 5 分钟执行 +- 调用 `pointRecordMapper.cancelExpiredPendingRecords()` + +--- + +## 三、文件清单 + +### 新建文件 + +| 文件 | 路径 | +|------|------| +| PointsService | `muye/points/service/PointsService.java` | +| PointsServiceImpl | `muye/points/service/PointsServiceImpl.java` | +| DifyProperties | `dify/config/DifyProperties.java` | +| DifyClient | `dify/client/DifyClient.java` | +| DifyService | `dify/service/DifyService.java` | +| DifyServiceImpl | `dify/service/DifyServiceImpl.java` | +| DifyReqVO | `dify/vo/DifyChatReqVO.java` | +| DifyRespVO | `dify/vo/DifyChatRespVO.java` | +| AppDifyController | `dify/controller/AppDifyController.java` | +| PointsPendingCleanJob | `job/PointsPendingCleanJob.java` | + +### 修改文件 + +| 文件 | 修改内容 | +|------|---------| +| PointRecordDO | 新增 status 字段 | +| MemberUserProfileMapper | 新增 selectByUserId、updatePointsDeduct 方法 | +| AiModelConfigMapper | 新增 selectByPlatformAndModelType 方法 | +| PointRecordMapper | 新增 cancelExpiredPendingRecords 方法 | +| ErrorCodeConstants | 新增积分相关错误码 | +| AiChatMessageServiceImpl | 集成积分扣减逻辑 | + +--- + +## 四、数据库变更 + +```sql +-- 1. 积分记录表新增状态字段 +ALTER TABLE muey_point_record +ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' +COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消'; + +-- 2. 添加索引(可选优化) +CREATE INDEX idx_point_record_status_time +ON muey_point_record(status, create_time); +``` + +--- + +## 五、依赖关系 + +``` +业务层 + ├── DifyService ──────┐ + ├── AiChatMessageService ──┼──→ PointsService(公共服务) + ├── TikHubService ─────────┤ │ + ├── VoiceService ──────────┤ ├── AiModelConfigMapper(配置查询) + └── DigitalHumanService ───┘ ├── MemberUserProfileMapper(积分扣减) + └── PointRecordMapper(流水记录) +``` + +--- + +## 六、验收标准 + +- [ ] PointsService 单元测试通过 +- [ ] Dify 流式接口正常返回 +- [ ] 积分不足时抛出正确异常 +- [ ] 流式中断时预扣正确取消 +- [ ] 预扣过期定时任务正常运行 +- [ ] 积分扣减原子性(并发不超扣)