From 3680ab0efd4a36e169b6fdead81261e2e42e67f8 Mon Sep 17 00:00:00 2001 From: sion123 <450702724@qq.com> Date: Sun, 22 Mar 2026 20:32:45 +0800 Subject: [PATCH] =?UTF-8?q?refactor(tik):=20=E9=87=8D=E6=9E=84=20Dify=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=AE=9E=E7=8E=B0=EF=BC=8C=E6=8F=90=E5=8F=96?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E9=80=BB=E8=BE=91=E4=B8=BA=E7=A7=81=E6=9C=89?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将聊天、改写、对标分析、提示词分析四个流式方法中的积分处理逻辑提取为通用私有方法 - 新增 `handlePointsDeduction`、`handlePointsCancellation`、`handleUserCancellation` 方法处理积分扣减、取消和用户取消场景 - 新增 `buildDoneMono` 方法统一构建流结束事件 - 新增 `resolveSystemPrompt` 方法处理系统提示词解析逻辑 - 简化方法实现,移除重复代码,提高代码可维护性 - 统一日志前缀使用,增强日志可读性 - 删除未使用的 `Consumer` 导入 --- .../tik/dify/service/DifyServiceImpl.java | 534 ++++-------------- 1 file changed, 100 insertions(+), 434 deletions(-) diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java index 39452e4639..dac1e60ae2 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/service/DifyServiceImpl.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; /** * Dify 服务实现类 @@ -48,287 +47,126 @@ public class DifyServiceImpl implements DifyService { @Override public Flux chatStream(DifyChatReqVO reqVO, String userId) { - // 用于存储预扣记录ID AtomicLong pendingRecordId = new AtomicLong(); - // 用于存储会话ID AtomicReference conversationIdRef = new AtomicReference<>(reqVO.getConversationId()); - // 用于存储 token 使用信息 AtomicReference tokenUsageRef = new AtomicReference<>(); - // Dify 用户标识(按 agentId 隔离会话) String difyUserId = "user-" + userId + "-agent-" + reqVO.getAgentId(); + String logPrefix = "chatStream"; return Mono.fromCallable(() -> { - // 1. 获取智能体配置 AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId()); if (agent == null) { throw new RuntimeException("智能体不存在"); } - // 2. 根据 modelMode 获取对应的积分配置 AiModelTypeEnum modelTypeEnum = "standard".equals(reqVO.getModelMode()) ? AiModelTypeEnum.DIFY_WRITING_STANDARD : AiModelTypeEnum.DIFY_WRITING_PRO; AiServiceConfigDO config = pointsService.getConfig( - AiPlatformEnum.DIFY.getPlatform(), - modelTypeEnum.getModelCode()); + AiPlatformEnum.DIFY.getPlatform(), modelTypeEnum.getModelCode()); - // 3. 预检积分 pointsService.checkPoints(userId, config.getConsumePoints()); - - // 4. 创建预扣记录(带 serviceCode) Long recordId = pointsService.createPendingDeduct( - userId, - config.getConsumePoints(), - "dify_chat", - reqVO.getAgentId().toString(), - config.getServiceCode() - ); + userId, config.getConsumePoints(), "dify_chat", + reqVO.getAgentId().toString(), config.getServiceCode()); pendingRecordId.set(recordId); - // 5. 返回调用参数 return new DifyChatContext(agent.getSystemPrompt(), config.getApiKey(), config.getConsumePoints()); }) - .flatMapMany(context -> { - // 6. 调用 Dify 流式 API - return difyClient.chatStream( - context.apiKey(), - reqVO.getContent(), - context.systemPrompt(), - reqVO.getConversationId(), - difyUserId - ) - .doOnNext(resp -> { - if (resp.getConversationId() != null) { - conversationIdRef.set(resp.getConversationId()); - } - // 捕获 token 使用信息 - if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { - tokenUsageRef.set(resp); - } - }) - // 7. 流结束时确认扣费(带 token) - .doOnComplete(() -> { - if (pendingRecordId.get() > 0) { - try { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - pointsService.confirmPendingDeductWithTokens( - pendingRecordId.get(), - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - ); - log.info("[chatStream] 流结束,确认扣费(带token),记录ID: {}, tokens: {}", - pendingRecordId.get(), tokenUsage.getTotalTokens()); - } else { - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[chatStream] 流结束,确认扣费(无token),记录ID: {}", pendingRecordId.get()); - } - } catch (Exception e) { - log.error("[chatStream] 确认扣费失败", e); - } - } - }) - // 8. 流出错时取消预扣 - .doOnError(e -> { - if (pendingRecordId.get() > 0) { - try { - pointsService.cancelPendingDeduct(pendingRecordId.get()); - log.info("[chatStream] 流出错,取消预扣,记录ID: {}", pendingRecordId.get()); - } catch (Exception ex) { - log.error("[chatStream] 取消预扣失败", ex); - } - } - }) - // 9. 用户取消时确认扣费(已消费的部分) - .doOnCancel(() -> { - if (pendingRecordId.get() > 0) { - try { - // 用户主动取消,仍然扣费(按最低消费) - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[chatStream] 用户取消,确认扣费,记录ID: {}", pendingRecordId.get()); - } catch (Exception e) { - log.error("[chatStream] 用户取消后扣费失败", e); - } - } - }); - }) - // 10. 在最后添加 done 事件 - .concatWith(Mono.defer(() -> { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - return Mono.just(DifyChatRespVO.doneWithTokens( - conversationIdRef.get(), null, - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - )); - } - return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null)); - })) + .flatMapMany(context -> difyClient.chatStream( + context.apiKey(), reqVO.getContent(), context.systemPrompt(), + reqVO.getConversationId(), difyUserId) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + conversationIdRef.set(resp.getConversationId()); + } + if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { + tokenUsageRef.set(resp); + } + }) + .doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix)) + .doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix)) + .doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix))) + .concatWith(buildDoneMono(conversationIdRef, tokenUsageRef)) .onErrorResume(e -> { - log.error("[chatStream] Dify 聊天异常", e); + log.error("[{}] Dify 聊天异常", logPrefix, e); return Flux.just(DifyChatRespVO.error(e.getMessage())); }); } @Override public Flux rewriteStream(ForecastRewriteReqVO reqVO, String userId) { - // 用于存储预扣记录ID AtomicLong pendingRecordId = new AtomicLong(); - // 用于存储会话ID AtomicReference conversationIdRef = new AtomicReference<>(""); - // 用于存储 token 使用信息 AtomicReference tokenUsageRef = new AtomicReference<>(); - // Dify 用户标识(按 agentId 隔离会话,无 agentId 时使用默认) String difyUserId = reqVO.getAgentId() != null - ? "user-" + userId + "-agent-" + reqVO.getAgentId() - : "user-" + userId; + ? "user-" + userId + "-agent-" + reqVO.getAgentId() + : "user-" + userId; + String logPrefix = "rewriteStream"; return Mono.fromCallable(() -> { - // 1. 获取系统提示词 - String systemPrompt; - if (reqVO.getCustomSystemPrompt() != null && !reqVO.getCustomSystemPrompt().isEmpty()) { - // 使用自定义系统提示词(用户风格) - systemPrompt = reqVO.getCustomSystemPrompt(); - log.info("[rewriteStream] 使用自定义系统提示词,长度: {}", systemPrompt.length()); - } else if (reqVO.getAgentId() != null) { - // 从智能体获取系统提示词 - AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId()); - if (agent == null) { - throw new RuntimeException("智能体不存在"); - } - systemPrompt = agent.getSystemPrompt(); - log.info("[rewriteStream] 使用智能体系统提示词,agentId: {}, 长度: {}", - reqVO.getAgentId(), systemPrompt != null ? systemPrompt.length() : 0); - } else { - throw new RuntimeException("必须提供 agentId 或 customSystemPrompt"); - } - - // 2. 根据 modelType 获取对应的积分配置 + String systemPrompt = resolveSystemPrompt(reqVO); AiModelTypeEnum modelTypeEnum = "forecast_pro".equals(reqVO.getModelType()) ? AiModelTypeEnum.FORECAST_PRO : AiModelTypeEnum.FORECAST_STANDARD; AiServiceConfigDO config = pointsService.getConfig( - AiPlatformEnum.DIFY.getPlatform(), - modelTypeEnum.getModelCode()); + AiPlatformEnum.DIFY.getPlatform(), modelTypeEnum.getModelCode()); - // 3. 预检积分 pointsService.checkPoints(userId, config.getConsumePoints()); - - // 4. 创建预扣记录(带 serviceCode) Long recordId = pointsService.createPendingDeduct( - userId, - config.getConsumePoints(), - "forecast_rewrite", - reqVO.getModelType(), - config.getServiceCode() - ); + userId, config.getConsumePoints(), "forecast_rewrite", + reqVO.getModelType(), config.getServiceCode()); pendingRecordId.set(recordId); - // 5. 构建 inputs 参数 Map inputs = new HashMap<>(); inputs.put("sysPrompt", systemPrompt); inputs.put("userText", reqVO.getUserText()); inputs.put("level", reqVO.getLevel()); - // 调试日志 - log.info("[rewriteStream] 请求参数 - agentId: {}, modelType: {}", reqVO.getAgentId(), reqVO.getModelType()); - log.info("[rewriteStream] inputs参数 - userText长度: {}, level: {}", - reqVO.getUserText() != null ? reqVO.getUserText().length() : 0, - reqVO.getLevel()); - log.info("[rewriteStream] API配置 - apiKey前缀: {}..., consumePoints: {}", - config.getApiKey() != null && config.getApiKey().length() > 10 - ? config.getApiKey().substring(0, 10) + "***" - : "null", - config.getConsumePoints()); - - // 6. 返回调用参数 + log.info("[{}] agentId: {}, modelType: {}, userText长度: {}", + logPrefix, reqVO.getAgentId(), reqVO.getModelType(), + reqVO.getUserText() != null ? reqVO.getUserText().length() : 0); return new ForecastRewriteContext(inputs, config.getApiKey(), config.getConsumePoints()); }) - .flatMapMany(context -> { - // 7. 调用 Dify 流式 API - return difyClient.chatStreamWithInputs( - context.apiKey(), - context.inputs(), - reqVO.getUserText(), - null, - difyUserId - ) - .doOnNext(resp -> { - if (resp.getConversationId() != null) { - conversationIdRef.set(resp.getConversationId()); - } - // 捕获 token 使用信息 - if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { - tokenUsageRef.set(resp); - } - }) - // 8. 流结束时确认扣费(带 token) - .doOnComplete(() -> { - if (pendingRecordId.get() > 0) { - try { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - pointsService.confirmPendingDeductWithTokens( - pendingRecordId.get(), - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - ); - log.info("[rewriteStream] 流结束,确认扣费(带token),记录ID: {}, tokens: {}", - pendingRecordId.get(), tokenUsage.getTotalTokens()); - } else { - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[rewriteStream] 流结束,确认扣费(无token),记录ID: {}", pendingRecordId.get()); - } - } catch (Exception e) { - log.error("[rewriteStream] 确认扣费失败", e); - } - } - }) - // 9. 流出错时取消预扣 - .doOnError(e -> { - if (pendingRecordId.get() > 0) { - try { - pointsService.cancelPendingDeduct(pendingRecordId.get()); - log.info("[rewriteStream] 流出错,取消预扣,记录ID: {}", pendingRecordId.get()); - } catch (Exception ex) { - log.error("[rewriteStream] 取消预扣失败", ex); - } - } - }) - // 10. 用户取消时确认扣费(已消费的部分) - .doOnCancel(() -> { - if (pendingRecordId.get() > 0) { - try { - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[rewriteStream] 用户取消,确认扣费,记录ID: {}", pendingRecordId.get()); - } catch (Exception e) { - log.error("[rewriteStream] 用户取消后扣费失败", e); - } - } - }); - }) - // 11. 在最后添加 done 事件 - .concatWith(Mono.defer(() -> { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - return Mono.just(DifyChatRespVO.doneWithTokens( - conversationIdRef.get(), null, - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - )); - } - return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null)); - })) + .flatMapMany(context -> difyClient.chatStreamWithInputs( + context.apiKey(), context.inputs(), reqVO.getUserText(), null, difyUserId) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + conversationIdRef.set(resp.getConversationId()); + } + if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { + tokenUsageRef.set(resp); + } + }) + .doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix)) + .doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix)) + .doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix))) + .concatWith(buildDoneMono(conversationIdRef, tokenUsageRef)) .onErrorResume(e -> { - log.error("[rewriteStream] Forecast 文案改写异常", e); + log.error("[{}] Forecast 文案改写异常", logPrefix, e); return Flux.just(DifyChatRespVO.error(e.getMessage())); }); } + /** + * 解析系统提示词 + */ + private String resolveSystemPrompt(ForecastRewriteReqVO reqVO) { + if (reqVO.getCustomSystemPrompt() != null && !reqVO.getCustomSystemPrompt().isEmpty()) { + log.info("[rewriteStream] 使用自定义系统提示词,长度: {}", reqVO.getCustomSystemPrompt().length()); + return reqVO.getCustomSystemPrompt(); + } + if (reqVO.getAgentId() != null) { + AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId()); + if (agent == null) { + throw new RuntimeException("智能体不存在"); + } + log.info("[rewriteStream] 使用智能体系统提示词,agentId: {}", reqVO.getAgentId()); + return agent.getSystemPrompt(); + } + throw new RuntimeException("必须提供 agentId 或 customSystemPrompt"); + } + /** * Dify 聊天上下文 */ @@ -341,128 +179,42 @@ public class DifyServiceImpl implements DifyService { @Override public Flux benchmarkAnalyzeStream(DifyBenchmarkReqVO reqVO, String userId) { - // 用于存储预扣记录ID AtomicLong pendingRecordId = new AtomicLong(); - // 用于存储会话ID AtomicReference conversationIdRef = new AtomicReference<>(""); - // 用于存储 token 使用信息 AtomicReference tokenUsageRef = new AtomicReference<>(); - // Dify 用户标识(固定格式) String difyUserId = "user-" + userId; + String logPrefix = "benchmarkAnalyzeStream"; return Mono.fromCallable(() -> { - // 1. 获取积分配置(复用提示词分析配置) AiServiceConfigDO config = pointsService.getConfig( AiPlatformEnum.DIFY.getPlatform(), AiModelTypeEnum.PROMPT_ANALYSIS.getModelCode()); - - // 2. 预检积分 pointsService.checkPoints(userId, config.getConsumePoints()); - // 3. 创建预扣记录(带 serviceCode) Long recordId = pointsService.createPendingDeduct( - userId, - config.getConsumePoints(), - "benchmark_analyze", - String.valueOf(reqVO.getVideoCount()), - config.getServiceCode() - ); + userId, config.getConsumePoints(), "benchmark_analyze", + String.valueOf(reqVO.getVideoCount()), config.getServiceCode()); pendingRecordId.set(recordId); - // 4. 构建对标分析提示词 String benchmarkPrompt = buildBenchmarkPrompt(reqVO.getContent(), reqVO.getVideoCount()); - - log.info("[benchmarkAnalyzeStream] 请求参数 - videoCount: {}", reqVO.getVideoCount()); - log.info("[benchmarkAnalyzeStream] 提示词长度: {}", benchmarkPrompt.length()); - log.info("[benchmarkAnalyzeStream] API配置 - apiKey前缀: {}..., consumePoints: {}", - config.getApiKey() != null && config.getApiKey().length() > 10 - ? config.getApiKey().substring(0, 10) + "***" - : "null", - config.getConsumePoints()); - - // 5. 返回调用参数 + log.info("[{}] 请求参数 - videoCount: {}, 提示词长度: {}", logPrefix, reqVO.getVideoCount(), benchmarkPrompt.length()); return new BenchmarkAnalyzeContext(benchmarkPrompt, config.getApiKey(), config.getConsumePoints()); }) - .flatMapMany(context -> { - // 6. 调用 Dify 流式 API - return difyClient.chatStream( - context.apiKey(), - context.prompt(), - null, // systemPrompt 为空,使用 Dify 工作流中的配置 - null, // conversationId 为空,新会话 - difyUserId - ) - .doOnNext(resp -> { - if (resp.getConversationId() != null) { - conversationIdRef.set(resp.getConversationId()); - } - // 捕获 token 使用信息 - if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { - tokenUsageRef.set(resp); - } - }) - // 7. 流结束时确认扣费(带 token) - .doOnComplete(() -> { - if (pendingRecordId.get() > 0) { - try { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - pointsService.confirmPendingDeductWithTokens( - pendingRecordId.get(), - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - ); - log.info("[benchmarkAnalyzeStream] 流结束,确认扣费(带token),记录ID: {}, tokens: {}", - pendingRecordId.get(), tokenUsage.getTotalTokens()); - } else { - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[benchmarkAnalyzeStream] 流结束,确认扣费(无token),记录ID: {}", pendingRecordId.get()); - } - } catch (Exception e) { - log.error("[benchmarkAnalyzeStream] 确认扣费失败", e); - } - } - }) - // 8. 流出错时取消预扣 - .doOnError(e -> { - if (pendingRecordId.get() > 0) { - try { - pointsService.cancelPendingDeduct(pendingRecordId.get()); - log.info("[benchmarkAnalyzeStream] 流出错,取消预扣,记录ID: {}", pendingRecordId.get()); - } catch (Exception ex) { - log.error("[benchmarkAnalyzeStream] 取消预扣失败", ex); - } - } - }) - // 9. 用户取消时确认扣费(已消费的部分) - .doOnCancel(() -> { - if (pendingRecordId.get() > 0) { - try { - // 用户主动取消,仍然扣费(按最低消费) - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[benchmarkAnalyzeStream] 用户取消,确认扣费,记录ID: {}", pendingRecordId.get()); - } catch (Exception e) { - log.error("[benchmarkAnalyzeStream] 用户取消后扣费失败", e); - } - } - }); - }) - // 10. 在最后添加 done 事件 - .concatWith(Mono.defer(() -> { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - return Mono.just(DifyChatRespVO.doneWithTokens( - conversationIdRef.get(), null, - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - )); - } - return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null)); - })) + .flatMapMany(context -> difyClient.chatStream(context.apiKey(), context.prompt(), null, null, difyUserId) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + conversationIdRef.set(resp.getConversationId()); + } + if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { + tokenUsageRef.set(resp); + } + }) + .doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix)) + .doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix)) + .doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix))) + .concatWith(buildDoneMono(conversationIdRef, tokenUsageRef)) .onErrorResume(e -> { - log.error("[benchmarkAnalyzeStream] 对标分析异常", e); + log.error("[{}] 对标分析异常", logPrefix, e); return Flux.just(DifyChatRespVO.error(e.getMessage())); }); } @@ -498,128 +250,42 @@ public class DifyServiceImpl implements DifyService { @Override public Flux promptAnalysisStream(PromptAnalysisReqVO reqVO, String userId) { - // 用于存储预扣记录ID AtomicLong pendingRecordId = new AtomicLong(); - // 用于存储会话ID AtomicReference conversationIdRef = new AtomicReference<>(""); - // 用于存储 token 使用信息 AtomicReference tokenUsageRef = new AtomicReference<>(); - // Dify 用户标识(固定格式) String difyUserId = "user-" + userId; + String logPrefix = "promptAnalysisStream"; return Mono.fromCallable(() -> { - // 1. 获取积分配置 AiServiceConfigDO config = pointsService.getConfig( AiPlatformEnum.DIFY.getPlatform(), AiModelTypeEnum.PROMPT_ANALYSIS.getModelCode()); - - // 2. 预检积分 pointsService.checkPoints(userId, config.getConsumePoints()); - // 3. 创建预扣记录(带 serviceCode) Long recordId = pointsService.createPendingDeduct( - userId, - config.getConsumePoints(), - "prompt_analysis", - String.valueOf(reqVO.getVideoCount()), - config.getServiceCode() - ); + userId, config.getConsumePoints(), "prompt_analysis", + String.valueOf(reqVO.getVideoCount()), config.getServiceCode()); pendingRecordId.set(recordId); - // 4. 构建提示词分析 prompt String analysisPrompt = buildPromptAnalysisPrompt(reqVO.getContent(), reqVO.getVideoCount()); - - log.info("[promptAnalysisStream] 请求参数 - videoCount: {}", reqVO.getVideoCount()); - log.info("[promptAnalysisStream] 提示词长度: {}", analysisPrompt.length()); - log.info("[promptAnalysisStream] API配置 - apiKey前缀: {}..., consumePoints: {}", - config.getApiKey() != null && config.getApiKey().length() > 10 - ? config.getApiKey().substring(0, 10) + "***" - : "null", - config.getConsumePoints()); - - // 5. 返回调用参数 + log.info("[{}] 请求参数 - videoCount: {}, 提示词长度: {}", logPrefix, reqVO.getVideoCount(), analysisPrompt.length()); return new PromptAnalysisContext(analysisPrompt, config.getApiKey(), config.getConsumePoints()); }) - .flatMapMany(context -> { - // 6. 调用 Dify 流式 API - return difyClient.chatStream( - context.apiKey(), - context.prompt(), - null, // systemPrompt 为空,使用 Dify 工作流中的配置 - null, // conversationId 为空,新会话 - difyUserId - ) - .doOnNext(resp -> { - if (resp.getConversationId() != null) { - conversationIdRef.set(resp.getConversationId()); - } - // 捕获 token 使用信息 - if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { - tokenUsageRef.set(resp); - } - }) - // 7. 流结束时确认扣费(带 token) - .doOnComplete(() -> { - if (pendingRecordId.get() > 0) { - try { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - pointsService.confirmPendingDeductWithTokens( - pendingRecordId.get(), - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - ); - log.info("[promptAnalysisStream] 流结束,确认扣费(带token),记录ID: {}, tokens: {}", - pendingRecordId.get(), tokenUsage.getTotalTokens()); - } else { - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[promptAnalysisStream] 流结束,确认扣费(无token),记录ID: {}", pendingRecordId.get()); - } - } catch (Exception e) { - log.error("[promptAnalysisStream] 确认扣费失败", e); - } - } - }) - // 8. 流出错时取消预扣 - .doOnError(e -> { - if (pendingRecordId.get() > 0) { - try { - pointsService.cancelPendingDeduct(pendingRecordId.get()); - log.info("[promptAnalysisStream] 流出错,取消预扣,记录ID: {}", pendingRecordId.get()); - } catch (Exception ex) { - log.error("[promptAnalysisStream] 取消预扣失败", ex); - } - } - }) - // 9. 用户取消时确认扣费(已消费的部分) - .doOnCancel(() -> { - if (pendingRecordId.get() > 0) { - try { - // 用户主动取消,仍然扣费(按最低消费) - pointsService.confirmPendingDeduct(pendingRecordId.get()); - log.info("[promptAnalysisStream] 用户取消,确认扣费,记录ID: {}", pendingRecordId.get()); - } catch (Exception e) { - log.error("[promptAnalysisStream] 用户取消后扣费失败", e); - } - } - }); - }) - // 10. 在最后添加 done 事件 - .concatWith(Mono.defer(() -> { - DifyChatRespVO tokenUsage = tokenUsageRef.get(); - if (tokenUsage != null) { - return Mono.just(DifyChatRespVO.doneWithTokens( - conversationIdRef.get(), null, - tokenUsage.getInputTokens(), - tokenUsage.getOutputTokens(), - tokenUsage.getTotalTokens() - )); - } - return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null)); - })) + .flatMapMany(context -> difyClient.chatStream(context.apiKey(), context.prompt(), null, null, difyUserId) + .doOnNext(resp -> { + if (resp.getConversationId() != null) { + conversationIdRef.set(resp.getConversationId()); + } + if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) { + tokenUsageRef.set(resp); + } + }) + .doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix)) + .doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix)) + .doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix))) + .concatWith(buildDoneMono(conversationIdRef, tokenUsageRef)) .onErrorResume(e -> { - log.error("[promptAnalysisStream] 提示词分析异常", e); + log.error("[{}] 提示词分析异常", logPrefix, e); return Flux.just(DifyChatRespVO.error(e.getMessage())); }); }