From 0cb020aeb9ee7f43135b31c82618006e148bc2df Mon Sep 17 00:00:00 2001 From: shenaowei <450702724@qq.com> Date: Mon, 23 Feb 2026 17:36:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/tik/dify/client/DifyClient.java | 87 ++--- .../mapper/MemberUserProfileMapper.java | 2 +- .../src/main/resources/IMPLEMENTATION_PLAN.md | 299 ------------------ 3 files changed, 47 insertions(+), 341 deletions(-) delete mode 100644 yudao-module-tik/src/main/resources/IMPLEMENTATION_PLAN.md diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java index 8b23bceb66..ba230e0bd2 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/dify/client/DifyClient.java @@ -8,6 +8,7 @@ import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; @@ -62,8 +63,7 @@ public class DifyClient { .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class) - .map(this::parseSSEEvent) - .filter(resp -> resp != null) + .flatMap(event -> Mono.justOrEmpty(parseSSEEvent(event))) .doOnNext(resp -> { if (resp.getConversationId() != null) { responseConversationId.set(resp.getConversationId()); @@ -83,59 +83,64 @@ public class DifyClient { * 解析 SSE 事件 */ private DifyChatRespVO parseSSEEvent(String event) { - if (event == null || event.isEmpty()) { + if (event == null || !event.startsWith("data:")) { + return null; + } + + String jsonStr = event.substring(5).trim(); + if (jsonStr.isEmpty() || "[DONE]".equals(jsonStr)) { return null; } try { - // 解析 SSE 事件格式 - // data: {"event": "message", "answer": "xxx", "conversation_id": "xxx"} - if (event.startsWith("data:")) { - String jsonStr = event.substring(5).trim(); - if (jsonStr.isEmpty() || jsonStr.equals("[DONE]")) { - return null; - } + String eventType = extractJsonValue(jsonStr, "event"); + String answer = extractJsonValue(jsonStr, "answer"); + String conversationId = extractJsonValue(jsonStr, "conversation_id"); - // 简单解析 JSON(实际项目建议使用 Jackson/Gson) - String eventType = extractJsonValue(jsonStr, "event"); - String answer = extractJsonValue(jsonStr, "answer"); - String conversationId = extractJsonValue(jsonStr, "conversation_id"); - - if ("message".equals(eventType) || "agent_message".equals(eventType)) { - return DifyChatRespVO.message(answer, conversationId); - } else if ("workflow_finished".equals(eventType) || "message_end".equals(eventType)) { - return DifyChatRespVO.done(conversationId, null); - } else if ("error".equals(eventType)) { - return DifyChatRespVO.error(answer); - } - } + return switch (eventType) { + case "message", "agent_message" -> DifyChatRespVO.message(answer, conversationId); + case "workflow_finished", "message_end" -> DifyChatRespVO.done(conversationId, null); + case "error" -> DifyChatRespVO.error(answer); + default -> null; + }; } catch (Exception e) { log.warn("[parseSSEEvent] 解析 SSE 事件失败: {}", event, e); + return null; } - return null; } /** - * 简单提取 JSON 值 + * 简单提取 JSON 值(支持字符串和非字符串格式) */ private String extractJsonValue(String json, String key) { - String pattern = "\"" + key + "\"\\s*:\\s*\""; - int start = json.indexOf(pattern); - if (start == -1) { - // 尝试非字符串格式 - pattern = "\"" + key + "\"\\s*:\\s*"; - start = json.indexOf(pattern); - if (start == -1) return null; - start += pattern.length(); - int end = json.indexOf(",", start); - if (end == -1) end = json.indexOf("}", start); - if (end == -1) return null; - return json.substring(start, end).trim(); + // 查找 "key": 后的位置,兼容带空格和不带空格的格式 + int keyStart = json.indexOf("\"" + key + "\":"); + if (keyStart == -1) { + return null; } - start += pattern.length(); - int end = json.indexOf("\"", start); - if (end == -1) return null; - return json.substring(start, end); + + int valueStart = keyStart + key.length() + 3; // 跳过 "key": + // 跳过空白字符 + while (valueStart < json.length() && Character.isWhitespace(json.charAt(valueStart))) { + valueStart++; + } + if (valueStart >= json.length()) { + return null; + } + + char firstChar = json.charAt(valueStart); + // 字符串值 "value" + if (firstChar == '"') { + int end = json.indexOf('"', valueStart + 1); + return end != -1 ? json.substring(valueStart + 1, end) : null; + } + + // 非字符串值(数字、布尔等) + int end = json.indexOf(',', valueStart); + if (end == -1) { + end = json.indexOf('}', valueStart); + } + return end != -1 ? json.substring(valueStart, end).trim() : null; } } diff --git a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/memberuserprofile/mapper/MemberUserProfileMapper.java b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/memberuserprofile/mapper/MemberUserProfileMapper.java index f1cd53a796..4d1fbe5b6a 100644 --- a/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/memberuserprofile/mapper/MemberUserProfileMapper.java +++ b/yudao-module-tik/src/main/java/cn/iocoder/yudao/module/tik/muye/memberuserprofile/mapper/MemberUserProfileMapper.java @@ -53,7 +53,7 @@ public interface MemberUserProfileMapper extends BaseMapperX 版本: v1.0 -> 日期: 2026-02-22 -> 基于设计文档: points-service-integration.md - ---- - -## 一、实现概览 - -### 当前状态分析 - -| 组件 | 文件路径 | 当前状态 | 需要修改 | -|------|---------|---------|---------| -| PointRecordDO | `muye/pointrecord/dal/PointRecordDO.java` | 完整 | 新增 status 字段 | -| MemberUserProfileMapper | `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` | 仅查询 | 新增原子扣减方法 | -| AiModelConfigMapper | `muye/aimodelconfig/mapper/AiModelConfigMapper.java` | 仅分页 | 新增按平台查询方法 | -| PointsService | 不存在 | **需新建** | 新建接口+实现 | -| DifyService | 不存在 | **需新建** | 新建服务 | - ---- - -## 二、实现步骤 - -### 步骤 1: 数据库层修改 - -#### 1.1 PointRecordDO 新增 status 字段 - -**文件**: `muye/pointrecord/dal/PointRecordDO.java` - -```java -// 新增字段 -private String status; // 状态:pending(预扣) / confirmed(已确认) / canceled(已取消) -``` - -**数据库迁移**: -```sql -ALTER TABLE muey_point_record ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消'; -``` - -#### 1.2 MemberUserProfileMapper 新增方法 - -**文件**: `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` - -```java -/** - * 根据用户ID查询档案 - */ -default MemberUserProfileDO selectByUserId(Long userId) { - return selectOne(new LambdaQueryWrapperX() - .eq(MemberUserProfileDO::getUserId, userId)); -} - -/** - * 原子扣减积分(乐观锁) - * @return 影响行数,0表示余额不足 - */ -@Update("UPDATE muey_member_user_profile " + - "SET remaining_points = remaining_points - #{points}, " + - " used_points = used_points + #{points}, " + - " update_time = NOW() " + - "WHERE user_id = #{userId} AND remaining_points >= #{points}") -int updatePointsDeduct(@Param("userId") Long userId, @Param("points") Integer points); -``` - -#### 1.3 AiModelConfigMapper 新增方法 - -**文件**: `muye/aimodelconfig/mapper/AiModelConfigMapper.java` - -```java -/** - * 根据平台和模型类型查询配置 - */ -default AiModelConfigDO selectByPlatformAndModelType(String platform, String modelType) { - return selectOne(new LambdaQueryWrapperX() - .eq(AiModelConfigDO::getPlatform, platform) - .eq(AiModelConfigDO::getModelType, modelType) - .eq(AiModelConfigDO::getStatus, 1)); -} -``` - -#### 1.4 PointRecordMapper 新增方法 - -**文件**: `muye/pointrecord/mapper/PointRecordMapper.java` - -```java -/** - * 取消过期的预扣记录(30分钟前) - */ -@Update("UPDATE muey_point_record SET status = 'canceled', update_time = NOW() " + - "WHERE status = 'pending' AND create_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)") -int cancelExpiredPendingRecords(); -``` - ---- - -### 步骤 2: 公共积分服务 - -#### 2.1 新建 PointsService 接口 - -**文件**: `muye/points/service/PointsService.java`(新建) - -```java -public interface PointsService { - - /** - * 获取积分配置 - */ - AiModelConfigDO getConfig(String platform, String modelType); - - /** - * 预检积分(余额不足抛异常) - */ - void checkPoints(Long userId, Integer points); - - /** - * 即时扣减(同步场景) - */ - Long deductPoints(Long userId, Integer points, String bizType, String bizId); - - /** - * 创建预扣(流式/异步场景) - * @return 预扣记录ID - */ - Long createPendingDeduct(Long userId, Integer points, String bizType, String bizId); - - /** - * 确认预扣(实际扣减) - */ - void confirmPendingDeduct(Long recordId); - - /** - * 取消预扣(不扣费) - */ - void cancelPendingDeduct(Long recordId); -} -``` - -#### 2.2 新建 PointsServiceImpl 实现 - -**文件**: `muye/points/service/PointsServiceImpl.java`(新建) - -核心逻辑: -- `deductPoints`: 调用 Mapper 原子扣减,失败抛 `POINTS_DEDUCT_FAILED` -- `createPendingDeduct`: 创建 status=pending 的记录 -- `confirmPendingDeduct`: 执行实际扣减 + 更新 status=confirmed -- `cancelPendingDeduct`: 更新 status=canceled - -#### 2.3 新建错误码常量 - -**文件**: `ErrorCodeConstants.java`(修改) - -```java -// 积分相关错误码 1001001-1001003 -ErrorCode POINTS_INSUFFICIENT = new ErrorCode(1001001, "积分不足"); -ErrorCode POINTS_CONFIG_NOT_FOUND = new ErrorCode(1001002, "积分配置不存在"); -ErrorCode POINTS_DEDUCT_FAILED = new ErrorCode(1001003, "积分扣减失败"); -``` - ---- - -### 步骤 3: Dify 工作流集成 - -#### 3.1 新建 Dify 配置类 - -**文件**: `dify/config/DifyProperties.java`(新建) - -```java -@ConfigurationProperties(prefix = "yudao.dify") -public class DifyProperties { - private String apiUrl; - private Integer timeout = 60; -} -``` - -#### 3.2 新建 DifyClient - -**文件**: `dify/client/DifyClient.java`(新建) - -- 调用 Dify 工作流 API -- 支持流式响应 -- 传入 sysPrompt 参数 - -#### 3.3 新建 DifyService - -**文件**: `dify/service/DifyService.java`(新建) - -```java -public interface DifyService { - /** - * 流式聊天(带积分扣减) - */ - Flux chatStream(DifyChatReqVO reqVO, Long userId); -} -``` - -#### 3.4 新建 DifyController - -**文件**: `dify/controller/AppDifyController.java`(新建) - -```java -@PostMapping("/api/tik/dify/chat/stream") -public Flux> chatStream(@RequestBody DifyChatReqVO reqVO); -``` - ---- - -### 步骤 4: 集成到现有服务 - -#### 4.1 AiChatMessageService 集成 - -**文件**: `service/chat/AiChatMessageServiceImpl.java`(修改) - -在 `sendChatMessageStream` 方法中: -1. 调用前:`pointsService.checkPoints()` -2. 创建预扣:`pointsService.createPendingDeduct()` -3. 流结束:`pointsService.confirmPendingDeduct()` -4. 出错/取消:`pointsService.cancelPendingDeduct()` - ---- - -### 步骤 5: 定时任务 - -#### 5.1 预扣过期清理任务 - -**文件**: `job/PointsPendingCleanJob.java`(新建) - -- 每 5 分钟执行 -- 调用 `pointRecordMapper.cancelExpiredPendingRecords()` - ---- - -## 三、文件清单 - -### 新建文件 - -| 文件 | 路径 | -|------|------| -| PointsService | `muye/points/service/PointsService.java` | -| PointsServiceImpl | `muye/points/service/PointsServiceImpl.java` | -| DifyProperties | `dify/config/DifyProperties.java` | -| DifyClient | `dify/client/DifyClient.java` | -| DifyService | `dify/service/DifyService.java` | -| DifyServiceImpl | `dify/service/DifyServiceImpl.java` | -| DifyReqVO | `dify/vo/DifyChatReqVO.java` | -| DifyRespVO | `dify/vo/DifyChatRespVO.java` | -| AppDifyController | `dify/controller/AppDifyController.java` | -| PointsPendingCleanJob | `job/PointsPendingCleanJob.java` | - -### 修改文件 - -| 文件 | 修改内容 | -|------|---------| -| PointRecordDO | 新增 status 字段 | -| MemberUserProfileMapper | 新增 selectByUserId、updatePointsDeduct 方法 | -| AiModelConfigMapper | 新增 selectByPlatformAndModelType 方法 | -| PointRecordMapper | 新增 cancelExpiredPendingRecords 方法 | -| ErrorCodeConstants | 新增积分相关错误码 | -| AiChatMessageServiceImpl | 集成积分扣减逻辑 | - ---- - -## 四、数据库变更 - -```sql --- 1. 积分记录表新增状态字段 -ALTER TABLE muey_point_record -ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' -COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消'; - --- 2. 添加索引(可选优化) -CREATE INDEX idx_point_record_status_time -ON muey_point_record(status, create_time); -``` - ---- - -## 五、依赖关系 - -``` -业务层 - ├── DifyService ──────┐ - ├── AiChatMessageService ──┼──→ PointsService(公共服务) - ├── TikHubService ─────────┤ │ - ├── VoiceService ──────────┤ ├── AiModelConfigMapper(配置查询) - └── DigitalHumanService ───┘ ├── MemberUserProfileMapper(积分扣减) - └── PointRecordMapper(流水记录) -``` - ---- - -## 六、验收标准 - -- [ ] PointsService 单元测试通过 -- [ ] Dify 流式接口正常返回 -- [ ] 积分不足时抛出正确异常 -- [ ] 流式中断时预扣正确取消 -- [ ] 预扣过期定时任务正常运行 -- [ ] 积分扣减原子性(并发不超扣)