优化
This commit is contained in:
@@ -8,6 +8,7 @@ import org.springframework.http.MediaType;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -62,8 +63,7 @@ public class DifyClient {
|
|||||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||||
.retrieve()
|
.retrieve()
|
||||||
.bodyToFlux(String.class)
|
.bodyToFlux(String.class)
|
||||||
.map(this::parseSSEEvent)
|
.flatMap(event -> Mono.justOrEmpty(parseSSEEvent(event)))
|
||||||
.filter(resp -> resp != null)
|
|
||||||
.doOnNext(resp -> {
|
.doOnNext(resp -> {
|
||||||
if (resp.getConversationId() != null) {
|
if (resp.getConversationId() != null) {
|
||||||
responseConversationId.set(resp.getConversationId());
|
responseConversationId.set(resp.getConversationId());
|
||||||
@@ -83,59 +83,64 @@ public class DifyClient {
|
|||||||
* 解析 SSE 事件
|
* 解析 SSE 事件
|
||||||
*/
|
*/
|
||||||
private DifyChatRespVO parseSSEEvent(String event) {
|
private DifyChatRespVO parseSSEEvent(String event) {
|
||||||
if (event == null || event.isEmpty()) {
|
if (event == null || !event.startsWith("data:")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
String jsonStr = event.substring(5).trim();
|
||||||
|
if (jsonStr.isEmpty() || "[DONE]".equals(jsonStr)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 解析 SSE 事件格式
|
String eventType = extractJsonValue(jsonStr, "event");
|
||||||
// data: {"event": "message", "answer": "xxx", "conversation_id": "xxx"}
|
String answer = extractJsonValue(jsonStr, "answer");
|
||||||
if (event.startsWith("data:")) {
|
String conversationId = extractJsonValue(jsonStr, "conversation_id");
|
||||||
String jsonStr = event.substring(5).trim();
|
|
||||||
if (jsonStr.isEmpty() || jsonStr.equals("[DONE]")) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 简单解析 JSON(实际项目建议使用 Jackson/Gson)
|
return switch (eventType) {
|
||||||
String eventType = extractJsonValue(jsonStr, "event");
|
case "message", "agent_message" -> DifyChatRespVO.message(answer, conversationId);
|
||||||
String answer = extractJsonValue(jsonStr, "answer");
|
case "workflow_finished", "message_end" -> DifyChatRespVO.done(conversationId, null);
|
||||||
String conversationId = extractJsonValue(jsonStr, "conversation_id");
|
case "error" -> DifyChatRespVO.error(answer);
|
||||||
|
default -> null;
|
||||||
if ("message".equals(eventType) || "agent_message".equals(eventType)) {
|
};
|
||||||
return DifyChatRespVO.message(answer, conversationId);
|
|
||||||
} else if ("workflow_finished".equals(eventType) || "message_end".equals(eventType)) {
|
|
||||||
return DifyChatRespVO.done(conversationId, null);
|
|
||||||
} else if ("error".equals(eventType)) {
|
|
||||||
return DifyChatRespVO.error(answer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("[parseSSEEvent] 解析 SSE 事件失败: {}", event, e);
|
log.warn("[parseSSEEvent] 解析 SSE 事件失败: {}", event, e);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 简单提取 JSON 值
|
* 简单提取 JSON 值(支持字符串和非字符串格式)
|
||||||
*/
|
*/
|
||||||
private String extractJsonValue(String json, String key) {
|
private String extractJsonValue(String json, String key) {
|
||||||
String pattern = "\"" + key + "\"\\s*:\\s*\"";
|
// 查找 "key": 后的位置,兼容带空格和不带空格的格式
|
||||||
int start = json.indexOf(pattern);
|
int keyStart = json.indexOf("\"" + key + "\":");
|
||||||
if (start == -1) {
|
if (keyStart == -1) {
|
||||||
// 尝试非字符串格式
|
return null;
|
||||||
pattern = "\"" + key + "\"\\s*:\\s*";
|
|
||||||
start = json.indexOf(pattern);
|
|
||||||
if (start == -1) return null;
|
|
||||||
start += pattern.length();
|
|
||||||
int end = json.indexOf(",", start);
|
|
||||||
if (end == -1) end = json.indexOf("}", start);
|
|
||||||
if (end == -1) return null;
|
|
||||||
return json.substring(start, end).trim();
|
|
||||||
}
|
}
|
||||||
start += pattern.length();
|
|
||||||
int end = json.indexOf("\"", start);
|
int valueStart = keyStart + key.length() + 3; // 跳过 "key":
|
||||||
if (end == -1) return null;
|
// 跳过空白字符
|
||||||
return json.substring(start, end);
|
while (valueStart < json.length() && Character.isWhitespace(json.charAt(valueStart))) {
|
||||||
|
valueStart++;
|
||||||
|
}
|
||||||
|
if (valueStart >= json.length()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
char firstChar = json.charAt(valueStart);
|
||||||
|
// 字符串值 "value"
|
||||||
|
if (firstChar == '"') {
|
||||||
|
int end = json.indexOf('"', valueStart + 1);
|
||||||
|
return end != -1 ? json.substring(valueStart + 1, end) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 非字符串值(数字、布尔等)
|
||||||
|
int end = json.indexOf(',', valueStart);
|
||||||
|
if (end == -1) {
|
||||||
|
end = json.indexOf('}', valueStart);
|
||||||
|
}
|
||||||
|
return end != -1 ? json.substring(valueStart, end).trim() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ public interface MemberUserProfileMapper extends BaseMapperX<MemberUserProfileDO
|
|||||||
* @param points 扣减积分数量(正数)
|
* @param points 扣减积分数量(正数)
|
||||||
* @return 影响行数,0表示余额不足
|
* @return 影响行数,0表示余额不足
|
||||||
*/
|
*/
|
||||||
@Update("UPDATE muey_member_user_profile " +
|
@Update("UPDATE muye_member_user_profile " +
|
||||||
"SET remaining_points = remaining_points - #{points}, " +
|
"SET remaining_points = remaining_points - #{points}, " +
|
||||||
" used_points = used_points + #{points}, " +
|
" used_points = used_points + #{points}, " +
|
||||||
" update_time = NOW() " +
|
" update_time = NOW() " +
|
||||||
|
|||||||
@@ -1,299 +0,0 @@
|
|||||||
# AI 服务积分扣减公共服务 - 实现计划
|
|
||||||
|
|
||||||
> 版本: v1.0
|
|
||||||
> 日期: 2026-02-22
|
|
||||||
> 基于设计文档: points-service-integration.md
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 一、实现概览
|
|
||||||
|
|
||||||
### 当前状态分析
|
|
||||||
|
|
||||||
| 组件 | 文件路径 | 当前状态 | 需要修改 |
|
|
||||||
|------|---------|---------|---------|
|
|
||||||
| PointRecordDO | `muye/pointrecord/dal/PointRecordDO.java` | 完整 | 新增 status 字段 |
|
|
||||||
| MemberUserProfileMapper | `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` | 仅查询 | 新增原子扣减方法 |
|
|
||||||
| AiModelConfigMapper | `muye/aimodelconfig/mapper/AiModelConfigMapper.java` | 仅分页 | 新增按平台查询方法 |
|
|
||||||
| PointsService | 不存在 | **需新建** | 新建接口+实现 |
|
|
||||||
| DifyService | 不存在 | **需新建** | 新建服务 |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 二、实现步骤
|
|
||||||
|
|
||||||
### 步骤 1: 数据库层修改
|
|
||||||
|
|
||||||
#### 1.1 PointRecordDO 新增 status 字段
|
|
||||||
|
|
||||||
**文件**: `muye/pointrecord/dal/PointRecordDO.java`
|
|
||||||
|
|
||||||
```java
|
|
||||||
// 新增字段
|
|
||||||
private String status; // 状态:pending(预扣) / confirmed(已确认) / canceled(已取消)
|
|
||||||
```
|
|
||||||
|
|
||||||
**数据库迁移**:
|
|
||||||
```sql
|
|
||||||
ALTER TABLE muey_point_record ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消';
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 1.2 MemberUserProfileMapper 新增方法
|
|
||||||
|
|
||||||
**文件**: `muye/memberuserprofile/mapper/MemberUserProfileMapper.java`
|
|
||||||
|
|
||||||
```java
|
|
||||||
/**
|
|
||||||
* 根据用户ID查询档案
|
|
||||||
*/
|
|
||||||
default MemberUserProfileDO selectByUserId(Long userId) {
|
|
||||||
return selectOne(new LambdaQueryWrapperX<MemberUserProfileDO>()
|
|
||||||
.eq(MemberUserProfileDO::getUserId, userId));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 原子扣减积分(乐观锁)
|
|
||||||
* @return 影响行数,0表示余额不足
|
|
||||||
*/
|
|
||||||
@Update("UPDATE muey_member_user_profile " +
|
|
||||||
"SET remaining_points = remaining_points - #{points}, " +
|
|
||||||
" used_points = used_points + #{points}, " +
|
|
||||||
" update_time = NOW() " +
|
|
||||||
"WHERE user_id = #{userId} AND remaining_points >= #{points}")
|
|
||||||
int updatePointsDeduct(@Param("userId") Long userId, @Param("points") Integer points);
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 1.3 AiModelConfigMapper 新增方法
|
|
||||||
|
|
||||||
**文件**: `muye/aimodelconfig/mapper/AiModelConfigMapper.java`
|
|
||||||
|
|
||||||
```java
|
|
||||||
/**
|
|
||||||
* 根据平台和模型类型查询配置
|
|
||||||
*/
|
|
||||||
default AiModelConfigDO selectByPlatformAndModelType(String platform, String modelType) {
|
|
||||||
return selectOne(new LambdaQueryWrapperX<AiModelConfigDO>()
|
|
||||||
.eq(AiModelConfigDO::getPlatform, platform)
|
|
||||||
.eq(AiModelConfigDO::getModelType, modelType)
|
|
||||||
.eq(AiModelConfigDO::getStatus, 1));
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 1.4 PointRecordMapper 新增方法
|
|
||||||
|
|
||||||
**文件**: `muye/pointrecord/mapper/PointRecordMapper.java`
|
|
||||||
|
|
||||||
```java
|
|
||||||
/**
|
|
||||||
* 取消过期的预扣记录(30分钟前)
|
|
||||||
*/
|
|
||||||
@Update("UPDATE muey_point_record SET status = 'canceled', update_time = NOW() " +
|
|
||||||
"WHERE status = 'pending' AND create_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)")
|
|
||||||
int cancelExpiredPendingRecords();
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 步骤 2: 公共积分服务
|
|
||||||
|
|
||||||
#### 2.1 新建 PointsService 接口
|
|
||||||
|
|
||||||
**文件**: `muye/points/service/PointsService.java`(新建)
|
|
||||||
|
|
||||||
```java
|
|
||||||
public interface PointsService {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取积分配置
|
|
||||||
*/
|
|
||||||
AiModelConfigDO getConfig(String platform, String modelType);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 预检积分(余额不足抛异常)
|
|
||||||
*/
|
|
||||||
void checkPoints(Long userId, Integer points);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 即时扣减(同步场景)
|
|
||||||
*/
|
|
||||||
Long deductPoints(Long userId, Integer points, String bizType, String bizId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建预扣(流式/异步场景)
|
|
||||||
* @return 预扣记录ID
|
|
||||||
*/
|
|
||||||
Long createPendingDeduct(Long userId, Integer points, String bizType, String bizId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 确认预扣(实际扣减)
|
|
||||||
*/
|
|
||||||
void confirmPendingDeduct(Long recordId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 取消预扣(不扣费)
|
|
||||||
*/
|
|
||||||
void cancelPendingDeduct(Long recordId);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 2.2 新建 PointsServiceImpl 实现
|
|
||||||
|
|
||||||
**文件**: `muye/points/service/PointsServiceImpl.java`(新建)
|
|
||||||
|
|
||||||
核心逻辑:
|
|
||||||
- `deductPoints`: 调用 Mapper 原子扣减,失败抛 `POINTS_DEDUCT_FAILED`
|
|
||||||
- `createPendingDeduct`: 创建 status=pending 的记录
|
|
||||||
- `confirmPendingDeduct`: 执行实际扣减 + 更新 status=confirmed
|
|
||||||
- `cancelPendingDeduct`: 更新 status=canceled
|
|
||||||
|
|
||||||
#### 2.3 新建错误码常量
|
|
||||||
|
|
||||||
**文件**: `ErrorCodeConstants.java`(修改)
|
|
||||||
|
|
||||||
```java
|
|
||||||
// 积分相关错误码 1001001-1001003
|
|
||||||
ErrorCode POINTS_INSUFFICIENT = new ErrorCode(1001001, "积分不足");
|
|
||||||
ErrorCode POINTS_CONFIG_NOT_FOUND = new ErrorCode(1001002, "积分配置不存在");
|
|
||||||
ErrorCode POINTS_DEDUCT_FAILED = new ErrorCode(1001003, "积分扣减失败");
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 步骤 3: Dify 工作流集成
|
|
||||||
|
|
||||||
#### 3.1 新建 Dify 配置类
|
|
||||||
|
|
||||||
**文件**: `dify/config/DifyProperties.java`(新建)
|
|
||||||
|
|
||||||
```java
|
|
||||||
@ConfigurationProperties(prefix = "yudao.dify")
|
|
||||||
public class DifyProperties {
|
|
||||||
private String apiUrl;
|
|
||||||
private Integer timeout = 60;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 3.2 新建 DifyClient
|
|
||||||
|
|
||||||
**文件**: `dify/client/DifyClient.java`(新建)
|
|
||||||
|
|
||||||
- 调用 Dify 工作流 API
|
|
||||||
- 支持流式响应
|
|
||||||
- 传入 sysPrompt 参数
|
|
||||||
|
|
||||||
#### 3.3 新建 DifyService
|
|
||||||
|
|
||||||
**文件**: `dify/service/DifyService.java`(新建)
|
|
||||||
|
|
||||||
```java
|
|
||||||
public interface DifyService {
|
|
||||||
/**
|
|
||||||
* 流式聊天(带积分扣减)
|
|
||||||
*/
|
|
||||||
Flux<DifyChatRespVO> chatStream(DifyChatReqVO reqVO, Long userId);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 3.4 新建 DifyController
|
|
||||||
|
|
||||||
**文件**: `dify/controller/AppDifyController.java`(新建)
|
|
||||||
|
|
||||||
```java
|
|
||||||
@PostMapping("/api/tik/dify/chat/stream")
|
|
||||||
public Flux<CommonResult<DifyChatRespVO>> chatStream(@RequestBody DifyChatReqVO reqVO);
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 步骤 4: 集成到现有服务
|
|
||||||
|
|
||||||
#### 4.1 AiChatMessageService 集成
|
|
||||||
|
|
||||||
**文件**: `service/chat/AiChatMessageServiceImpl.java`(修改)
|
|
||||||
|
|
||||||
在 `sendChatMessageStream` 方法中:
|
|
||||||
1. 调用前:`pointsService.checkPoints()`
|
|
||||||
2. 创建预扣:`pointsService.createPendingDeduct()`
|
|
||||||
3. 流结束:`pointsService.confirmPendingDeduct()`
|
|
||||||
4. 出错/取消:`pointsService.cancelPendingDeduct()`
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 步骤 5: 定时任务
|
|
||||||
|
|
||||||
#### 5.1 预扣过期清理任务
|
|
||||||
|
|
||||||
**文件**: `job/PointsPendingCleanJob.java`(新建)
|
|
||||||
|
|
||||||
- 每 5 分钟执行
|
|
||||||
- 调用 `pointRecordMapper.cancelExpiredPendingRecords()`
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 三、文件清单
|
|
||||||
|
|
||||||
### 新建文件
|
|
||||||
|
|
||||||
| 文件 | 路径 |
|
|
||||||
|------|------|
|
|
||||||
| PointsService | `muye/points/service/PointsService.java` |
|
|
||||||
| PointsServiceImpl | `muye/points/service/PointsServiceImpl.java` |
|
|
||||||
| DifyProperties | `dify/config/DifyProperties.java` |
|
|
||||||
| DifyClient | `dify/client/DifyClient.java` |
|
|
||||||
| DifyService | `dify/service/DifyService.java` |
|
|
||||||
| DifyServiceImpl | `dify/service/DifyServiceImpl.java` |
|
|
||||||
| DifyReqVO | `dify/vo/DifyChatReqVO.java` |
|
|
||||||
| DifyRespVO | `dify/vo/DifyChatRespVO.java` |
|
|
||||||
| AppDifyController | `dify/controller/AppDifyController.java` |
|
|
||||||
| PointsPendingCleanJob | `job/PointsPendingCleanJob.java` |
|
|
||||||
|
|
||||||
### 修改文件
|
|
||||||
|
|
||||||
| 文件 | 修改内容 |
|
|
||||||
|------|---------|
|
|
||||||
| PointRecordDO | 新增 status 字段 |
|
|
||||||
| MemberUserProfileMapper | 新增 selectByUserId、updatePointsDeduct 方法 |
|
|
||||||
| AiModelConfigMapper | 新增 selectByPlatformAndModelType 方法 |
|
|
||||||
| PointRecordMapper | 新增 cancelExpiredPendingRecords 方法 |
|
|
||||||
| ErrorCodeConstants | 新增积分相关错误码 |
|
|
||||||
| AiChatMessageServiceImpl | 集成积分扣减逻辑 |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 四、数据库变更
|
|
||||||
|
|
||||||
```sql
|
|
||||||
-- 1. 积分记录表新增状态字段
|
|
||||||
ALTER TABLE muey_point_record
|
|
||||||
ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed'
|
|
||||||
COMMENT '状态:pending-预扣 confirmed-已确认 canceled-已取消';
|
|
||||||
|
|
||||||
-- 2. 添加索引(可选优化)
|
|
||||||
CREATE INDEX idx_point_record_status_time
|
|
||||||
ON muey_point_record(status, create_time);
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 五、依赖关系
|
|
||||||
|
|
||||||
```
|
|
||||||
业务层
|
|
||||||
├── DifyService ──────┐
|
|
||||||
├── AiChatMessageService ──┼──→ PointsService(公共服务)
|
|
||||||
├── TikHubService ─────────┤ │
|
|
||||||
├── VoiceService ──────────┤ ├── AiModelConfigMapper(配置查询)
|
|
||||||
└── DigitalHumanService ───┘ ├── MemberUserProfileMapper(积分扣减)
|
|
||||||
└── PointRecordMapper(流水记录)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 六、验收标准
|
|
||||||
|
|
||||||
- [ ] PointsService 单元测试通过
|
|
||||||
- [ ] Dify 流式接口正常返回
|
|
||||||
- [ ] 积分不足时抛出正确异常
|
|
||||||
- [ ] 流式中断时预扣正确取消
|
|
||||||
- [ ] 预扣过期定时任务正常运行
|
|
||||||
- [ ] 积分扣减原子性(并发不超扣)
|
|
||||||
Reference in New Issue
Block a user