refactor(tik): 重构 Dify 服务实现,提取重复逻辑为私有方法
Some checks failed
Build and Deploy / deploy (push) Has been cancelled

- 将聊天、改写、对标分析、提示词分析四个流式方法中的积分处理逻辑提取为通用私有方法
- 新增 `handlePointsDeduction`、`handlePointsCancellation`、`handleUserCancellation` 方法处理积分扣减、取消和用户取消场景
- 新增 `buildDoneMono` 方法统一构建流结束事件
- 新增 `resolveSystemPrompt` 方法处理系统提示词解析逻辑
- 简化方法实现,移除重复代码,提高代码可维护性
- 统一日志前缀使用,增强日志可读性
- 删除未使用的 `Consumer` 导入
This commit is contained in:
2026-03-22 20:32:45 +08:00
parent e437ed699e
commit 3680ab0efd

View File

@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* Dify 服务实现类
@@ -48,287 +47,126 @@ public class DifyServiceImpl implements DifyService {
@Override
public Flux<DifyChatRespVO> chatStream(DifyChatReqVO reqVO, String userId) {
// 用于存储预扣记录ID
AtomicLong pendingRecordId = new AtomicLong();
// 用于存储会话ID
AtomicReference<String> conversationIdRef = new AtomicReference<>(reqVO.getConversationId());
// 用于存储 token 使用信息
AtomicReference<DifyChatRespVO> tokenUsageRef = new AtomicReference<>();
// Dify 用户标识(按 agentId 隔离会话)
String difyUserId = "user-" + userId + "-agent-" + reqVO.getAgentId();
String logPrefix = "chatStream";
return Mono.fromCallable(() -> {
// 1. 获取智能体配置
AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId());
if (agent == null) {
throw new RuntimeException("智能体不存在");
}
// 2. 根据 modelMode 获取对应的积分配置
AiModelTypeEnum modelTypeEnum = "standard".equals(reqVO.getModelMode())
? AiModelTypeEnum.DIFY_WRITING_STANDARD
: AiModelTypeEnum.DIFY_WRITING_PRO;
AiServiceConfigDO config = pointsService.getConfig(
AiPlatformEnum.DIFY.getPlatform(),
modelTypeEnum.getModelCode());
AiPlatformEnum.DIFY.getPlatform(), modelTypeEnum.getModelCode());
// 3. 预检积分
pointsService.checkPoints(userId, config.getConsumePoints());
// 4. 创建预扣记录(带 serviceCode
Long recordId = pointsService.createPendingDeduct(
userId,
config.getConsumePoints(),
"dify_chat",
reqVO.getAgentId().toString(),
config.getServiceCode()
);
userId, config.getConsumePoints(), "dify_chat",
reqVO.getAgentId().toString(), config.getServiceCode());
pendingRecordId.set(recordId);
// 5. 返回调用参数
return new DifyChatContext(agent.getSystemPrompt(), config.getApiKey(), config.getConsumePoints());
})
.flatMapMany(context -> {
// 6. 调用 Dify 流式 API
return difyClient.chatStream(
context.apiKey(),
reqVO.getContent(),
context.systemPrompt(),
reqVO.getConversationId(),
difyUserId
)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
// 捕获 token 使用信息
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
// 7. 流结束时确认扣费(带 token
.doOnComplete(() -> {
if (pendingRecordId.get() > 0) {
try {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
pointsService.confirmPendingDeductWithTokens(
pendingRecordId.get(),
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
);
log.info("[chatStream] 流结束确认扣费带token记录ID: {}, tokens: {}",
pendingRecordId.get(), tokenUsage.getTotalTokens());
} else {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 流结束确认扣费无token记录ID: {}", pendingRecordId.get());
}
} catch (Exception e) {
log.error("[chatStream] 确认扣费失败", e);
}
}
})
// 8. 流出错时取消预扣
.doOnError(e -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.cancelPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 流出错取消预扣记录ID: {}", pendingRecordId.get());
} catch (Exception ex) {
log.error("[chatStream] 取消预扣失败", ex);
}
}
})
// 9. 用户取消时确认扣费(已消费的部分)
.doOnCancel(() -> {
if (pendingRecordId.get() > 0) {
try {
// 用户主动取消,仍然扣费(按最低消费)
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 用户取消确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[chatStream] 用户取消后扣费失败", e);
}
}
});
})
// 10. 在最后添加 done 事件
.concatWith(Mono.defer(() -> {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
return Mono.just(DifyChatRespVO.doneWithTokens(
conversationIdRef.get(), null,
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
));
}
return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null));
}))
.flatMapMany(context -> difyClient.chatStream(
context.apiKey(), reqVO.getContent(), context.systemPrompt(),
reqVO.getConversationId(), difyUserId)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
.doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix))
.doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix))
.doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix)))
.concatWith(buildDoneMono(conversationIdRef, tokenUsageRef))
.onErrorResume(e -> {
log.error("[chatStream] Dify 聊天异常", e);
log.error("[{}] Dify 聊天异常", logPrefix, e);
return Flux.just(DifyChatRespVO.error(e.getMessage()));
});
}
@Override
public Flux<DifyChatRespVO> rewriteStream(ForecastRewriteReqVO reqVO, String userId) {
// 用于存储预扣记录ID
AtomicLong pendingRecordId = new AtomicLong();
// 用于存储会话ID
AtomicReference<String> conversationIdRef = new AtomicReference<>("");
// 用于存储 token 使用信息
AtomicReference<DifyChatRespVO> tokenUsageRef = new AtomicReference<>();
// Dify 用户标识(按 agentId 隔离会话,无 agentId 时使用默认)
String difyUserId = reqVO.getAgentId() != null
? "user-" + userId + "-agent-" + reqVO.getAgentId()
: "user-" + userId;
? "user-" + userId + "-agent-" + reqVO.getAgentId()
: "user-" + userId;
String logPrefix = "rewriteStream";
return Mono.fromCallable(() -> {
// 1. 获取系统提示词
String systemPrompt;
if (reqVO.getCustomSystemPrompt() != null && !reqVO.getCustomSystemPrompt().isEmpty()) {
// 使用自定义系统提示词(用户风格)
systemPrompt = reqVO.getCustomSystemPrompt();
log.info("[rewriteStream] 使用自定义系统提示词,长度: {}", systemPrompt.length());
} else if (reqVO.getAgentId() != null) {
// 从智能体获取系统提示词
AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId());
if (agent == null) {
throw new RuntimeException("智能体不存在");
}
systemPrompt = agent.getSystemPrompt();
log.info("[rewriteStream] 使用智能体系统提示词agentId: {}, 长度: {}",
reqVO.getAgentId(), systemPrompt != null ? systemPrompt.length() : 0);
} else {
throw new RuntimeException("必须提供 agentId 或 customSystemPrompt");
}
// 2. 根据 modelType 获取对应的积分配置
String systemPrompt = resolveSystemPrompt(reqVO);
AiModelTypeEnum modelTypeEnum = "forecast_pro".equals(reqVO.getModelType())
? AiModelTypeEnum.FORECAST_PRO
: AiModelTypeEnum.FORECAST_STANDARD;
AiServiceConfigDO config = pointsService.getConfig(
AiPlatformEnum.DIFY.getPlatform(),
modelTypeEnum.getModelCode());
AiPlatformEnum.DIFY.getPlatform(), modelTypeEnum.getModelCode());
// 3. 预检积分
pointsService.checkPoints(userId, config.getConsumePoints());
// 4. 创建预扣记录(带 serviceCode
Long recordId = pointsService.createPendingDeduct(
userId,
config.getConsumePoints(),
"forecast_rewrite",
reqVO.getModelType(),
config.getServiceCode()
);
userId, config.getConsumePoints(), "forecast_rewrite",
reqVO.getModelType(), config.getServiceCode());
pendingRecordId.set(recordId);
// 5. 构建 inputs 参数
Map<String, Object> inputs = new HashMap<>();
inputs.put("sysPrompt", systemPrompt);
inputs.put("userText", reqVO.getUserText());
inputs.put("level", reqVO.getLevel());
// 调试日志
log.info("[rewriteStream] 请求参数 - agentId: {}, modelType: {}", reqVO.getAgentId(), reqVO.getModelType());
log.info("[rewriteStream] inputs参数 - userText长度: {}, level: {}",
reqVO.getUserText() != null ? reqVO.getUserText().length() : 0,
reqVO.getLevel());
log.info("[rewriteStream] API配置 - apiKey前缀: {}..., consumePoints: {}",
config.getApiKey() != null && config.getApiKey().length() > 10
? config.getApiKey().substring(0, 10) + "***"
: "null",
config.getConsumePoints());
// 6. 返回调用参数
log.info("[{}] agentId: {}, modelType: {}, userText长度: {}",
logPrefix, reqVO.getAgentId(), reqVO.getModelType(),
reqVO.getUserText() != null ? reqVO.getUserText().length() : 0);
return new ForecastRewriteContext(inputs, config.getApiKey(), config.getConsumePoints());
})
.flatMapMany(context -> {
// 7. 调用 Dify 流式 API
return difyClient.chatStreamWithInputs(
context.apiKey(),
context.inputs(),
reqVO.getUserText(),
null,
difyUserId
)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
// 捕获 token 使用信息
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
// 8. 流结束时确认扣费(带 token
.doOnComplete(() -> {
if (pendingRecordId.get() > 0) {
try {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
pointsService.confirmPendingDeductWithTokens(
pendingRecordId.get(),
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
);
log.info("[rewriteStream] 流结束确认扣费带token记录ID: {}, tokens: {}",
pendingRecordId.get(), tokenUsage.getTotalTokens());
} else {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[rewriteStream] 流结束确认扣费无token记录ID: {}", pendingRecordId.get());
}
} catch (Exception e) {
log.error("[rewriteStream] 确认扣费失败", e);
}
}
})
// 9. 流出错时取消预扣
.doOnError(e -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.cancelPendingDeduct(pendingRecordId.get());
log.info("[rewriteStream] 流出错取消预扣记录ID: {}", pendingRecordId.get());
} catch (Exception ex) {
log.error("[rewriteStream] 取消预扣失败", ex);
}
}
})
// 10. 用户取消时确认扣费(已消费的部分)
.doOnCancel(() -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[rewriteStream] 用户取消确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[rewriteStream] 用户取消后扣费失败", e);
}
}
});
})
// 11. 在最后添加 done 事件
.concatWith(Mono.defer(() -> {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
return Mono.just(DifyChatRespVO.doneWithTokens(
conversationIdRef.get(), null,
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
));
}
return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null));
}))
.flatMapMany(context -> difyClient.chatStreamWithInputs(
context.apiKey(), context.inputs(), reqVO.getUserText(), null, difyUserId)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
.doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix))
.doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix))
.doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix)))
.concatWith(buildDoneMono(conversationIdRef, tokenUsageRef))
.onErrorResume(e -> {
log.error("[rewriteStream] Forecast 文案改写异常", e);
log.error("[{}] Forecast 文案改写异常", logPrefix, e);
return Flux.just(DifyChatRespVO.error(e.getMessage()));
});
}
/**
* 解析系统提示词
*/
private String resolveSystemPrompt(ForecastRewriteReqVO reqVO) {
if (reqVO.getCustomSystemPrompt() != null && !reqVO.getCustomSystemPrompt().isEmpty()) {
log.info("[rewriteStream] 使用自定义系统提示词,长度: {}", reqVO.getCustomSystemPrompt().length());
return reqVO.getCustomSystemPrompt();
}
if (reqVO.getAgentId() != null) {
AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId());
if (agent == null) {
throw new RuntimeException("智能体不存在");
}
log.info("[rewriteStream] 使用智能体系统提示词agentId: {}", reqVO.getAgentId());
return agent.getSystemPrompt();
}
throw new RuntimeException("必须提供 agentId 或 customSystemPrompt");
}
/**
* Dify 聊天上下文
*/
@@ -341,128 +179,42 @@ public class DifyServiceImpl implements DifyService {
@Override
public Flux<DifyChatRespVO> benchmarkAnalyzeStream(DifyBenchmarkReqVO reqVO, String userId) {
// 用于存储预扣记录ID
AtomicLong pendingRecordId = new AtomicLong();
// 用于存储会话ID
AtomicReference<String> conversationIdRef = new AtomicReference<>("");
// 用于存储 token 使用信息
AtomicReference<DifyChatRespVO> tokenUsageRef = new AtomicReference<>();
// Dify 用户标识(固定格式)
String difyUserId = "user-" + userId;
String logPrefix = "benchmarkAnalyzeStream";
return Mono.fromCallable(() -> {
// 1. 获取积分配置(复用提示词分析配置)
AiServiceConfigDO config = pointsService.getConfig(
AiPlatformEnum.DIFY.getPlatform(),
AiModelTypeEnum.PROMPT_ANALYSIS.getModelCode());
// 2. 预检积分
pointsService.checkPoints(userId, config.getConsumePoints());
// 3. 创建预扣记录(带 serviceCode
Long recordId = pointsService.createPendingDeduct(
userId,
config.getConsumePoints(),
"benchmark_analyze",
String.valueOf(reqVO.getVideoCount()),
config.getServiceCode()
);
userId, config.getConsumePoints(), "benchmark_analyze",
String.valueOf(reqVO.getVideoCount()), config.getServiceCode());
pendingRecordId.set(recordId);
// 4. 构建对标分析提示词
String benchmarkPrompt = buildBenchmarkPrompt(reqVO.getContent(), reqVO.getVideoCount());
log.info("[benchmarkAnalyzeStream] 请求参数 - videoCount: {}", reqVO.getVideoCount());
log.info("[benchmarkAnalyzeStream] 提示词长度: {}", benchmarkPrompt.length());
log.info("[benchmarkAnalyzeStream] API配置 - apiKey前缀: {}..., consumePoints: {}",
config.getApiKey() != null && config.getApiKey().length() > 10
? config.getApiKey().substring(0, 10) + "***"
: "null",
config.getConsumePoints());
// 5. 返回调用参数
log.info("[{}] 请求参数 - videoCount: {}, 提示词长度: {}", logPrefix, reqVO.getVideoCount(), benchmarkPrompt.length());
return new BenchmarkAnalyzeContext(benchmarkPrompt, config.getApiKey(), config.getConsumePoints());
})
.flatMapMany(context -> {
// 6. 调用 Dify 流式 API
return difyClient.chatStream(
context.apiKey(),
context.prompt(),
null, // systemPrompt 为空,使用 Dify 工作流中的配置
null, // conversationId 为空,新会话
difyUserId
)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
// 捕获 token 使用信息
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
// 7. 流结束时确认扣费(带 token
.doOnComplete(() -> {
if (pendingRecordId.get() > 0) {
try {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
pointsService.confirmPendingDeductWithTokens(
pendingRecordId.get(),
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
);
log.info("[benchmarkAnalyzeStream] 流结束确认扣费带token记录ID: {}, tokens: {}",
pendingRecordId.get(), tokenUsage.getTotalTokens());
} else {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[benchmarkAnalyzeStream] 流结束确认扣费无token记录ID: {}", pendingRecordId.get());
}
} catch (Exception e) {
log.error("[benchmarkAnalyzeStream] 确认扣费失败", e);
}
}
})
// 8. 流出错时取消预扣
.doOnError(e -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.cancelPendingDeduct(pendingRecordId.get());
log.info("[benchmarkAnalyzeStream] 流出错取消预扣记录ID: {}", pendingRecordId.get());
} catch (Exception ex) {
log.error("[benchmarkAnalyzeStream] 取消预扣失败", ex);
}
}
})
// 9. 用户取消时确认扣费(已消费的部分)
.doOnCancel(() -> {
if (pendingRecordId.get() > 0) {
try {
// 用户主动取消,仍然扣费(按最低消费)
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[benchmarkAnalyzeStream] 用户取消确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[benchmarkAnalyzeStream] 用户取消后扣费失败", e);
}
}
});
})
// 10. 在最后添加 done 事件
.concatWith(Mono.defer(() -> {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
return Mono.just(DifyChatRespVO.doneWithTokens(
conversationIdRef.get(), null,
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
));
}
return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null));
}))
.flatMapMany(context -> difyClient.chatStream(context.apiKey(), context.prompt(), null, null, difyUserId)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
.doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix))
.doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix))
.doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix)))
.concatWith(buildDoneMono(conversationIdRef, tokenUsageRef))
.onErrorResume(e -> {
log.error("[benchmarkAnalyzeStream] 对标分析异常", e);
log.error("[{}] 对标分析异常", logPrefix, e);
return Flux.just(DifyChatRespVO.error(e.getMessage()));
});
}
@@ -498,128 +250,42 @@ public class DifyServiceImpl implements DifyService {
@Override
public Flux<DifyChatRespVO> promptAnalysisStream(PromptAnalysisReqVO reqVO, String userId) {
// 用于存储预扣记录ID
AtomicLong pendingRecordId = new AtomicLong();
// 用于存储会话ID
AtomicReference<String> conversationIdRef = new AtomicReference<>("");
// 用于存储 token 使用信息
AtomicReference<DifyChatRespVO> tokenUsageRef = new AtomicReference<>();
// Dify 用户标识(固定格式)
String difyUserId = "user-" + userId;
String logPrefix = "promptAnalysisStream";
return Mono.fromCallable(() -> {
// 1. 获取积分配置
AiServiceConfigDO config = pointsService.getConfig(
AiPlatformEnum.DIFY.getPlatform(),
AiModelTypeEnum.PROMPT_ANALYSIS.getModelCode());
// 2. 预检积分
pointsService.checkPoints(userId, config.getConsumePoints());
// 3. 创建预扣记录(带 serviceCode
Long recordId = pointsService.createPendingDeduct(
userId,
config.getConsumePoints(),
"prompt_analysis",
String.valueOf(reqVO.getVideoCount()),
config.getServiceCode()
);
userId, config.getConsumePoints(), "prompt_analysis",
String.valueOf(reqVO.getVideoCount()), config.getServiceCode());
pendingRecordId.set(recordId);
// 4. 构建提示词分析 prompt
String analysisPrompt = buildPromptAnalysisPrompt(reqVO.getContent(), reqVO.getVideoCount());
log.info("[promptAnalysisStream] 请求参数 - videoCount: {}", reqVO.getVideoCount());
log.info("[promptAnalysisStream] 提示词长度: {}", analysisPrompt.length());
log.info("[promptAnalysisStream] API配置 - apiKey前缀: {}..., consumePoints: {}",
config.getApiKey() != null && config.getApiKey().length() > 10
? config.getApiKey().substring(0, 10) + "***"
: "null",
config.getConsumePoints());
// 5. 返回调用参数
log.info("[{}] 请求参数 - videoCount: {}, 提示词长度: {}", logPrefix, reqVO.getVideoCount(), analysisPrompt.length());
return new PromptAnalysisContext(analysisPrompt, config.getApiKey(), config.getConsumePoints());
})
.flatMapMany(context -> {
// 6. 调用 Dify 流式 API
return difyClient.chatStream(
context.apiKey(),
context.prompt(),
null, // systemPrompt 为空,使用 Dify 工作流中的配置
null, // conversationId 为空,新会话
difyUserId
)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
// 捕获 token 使用信息
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
// 7. 流结束时确认扣费(带 token
.doOnComplete(() -> {
if (pendingRecordId.get() > 0) {
try {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
pointsService.confirmPendingDeductWithTokens(
pendingRecordId.get(),
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
);
log.info("[promptAnalysisStream] 流结束确认扣费带token记录ID: {}, tokens: {}",
pendingRecordId.get(), tokenUsage.getTotalTokens());
} else {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[promptAnalysisStream] 流结束确认扣费无token记录ID: {}", pendingRecordId.get());
}
} catch (Exception e) {
log.error("[promptAnalysisStream] 确认扣费失败", e);
}
}
})
// 8. 流出错时取消预扣
.doOnError(e -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.cancelPendingDeduct(pendingRecordId.get());
log.info("[promptAnalysisStream] 流出错取消预扣记录ID: {}", pendingRecordId.get());
} catch (Exception ex) {
log.error("[promptAnalysisStream] 取消预扣失败", ex);
}
}
})
// 9. 用户取消时确认扣费(已消费的部分)
.doOnCancel(() -> {
if (pendingRecordId.get() > 0) {
try {
// 用户主动取消,仍然扣费(按最低消费)
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[promptAnalysisStream] 用户取消确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[promptAnalysisStream] 用户取消后扣费失败", e);
}
}
});
})
// 10. 在最后添加 done 事件
.concatWith(Mono.defer(() -> {
DifyChatRespVO tokenUsage = tokenUsageRef.get();
if (tokenUsage != null) {
return Mono.just(DifyChatRespVO.doneWithTokens(
conversationIdRef.get(), null,
tokenUsage.getInputTokens(),
tokenUsage.getOutputTokens(),
tokenUsage.getTotalTokens()
));
}
return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null));
}))
.flatMapMany(context -> difyClient.chatStream(context.apiKey(), context.prompt(), null, null, difyUserId)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
if (resp.getTotalTokens() != null && resp.getTotalTokens() > 0) {
tokenUsageRef.set(resp);
}
})
.doOnComplete(() -> handlePointsDeduction(pendingRecordId, tokenUsageRef, logPrefix))
.doOnError(e -> handlePointsCancellation(pendingRecordId, logPrefix))
.doOnCancel(() -> handleUserCancellation(pendingRecordId, logPrefix)))
.concatWith(buildDoneMono(conversationIdRef, tokenUsageRef))
.onErrorResume(e -> {
log.error("[promptAnalysisStream] 提示词分析异常", e);
log.error("[{}] 提示词分析异常", logPrefix, e);
return Flux.just(DifyChatRespVO.error(e.getMessage()));
});
}