send-stream

This commit is contained in:
wing
2026-02-22 20:30:12 +08:00
parent ca633f74b6
commit be9146499c
11 changed files with 1044 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
package cn.iocoder.yudao.module.tik.dify.client;
import cn.iocoder.yudao.module.tik.dify.config.DifyProperties;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* Dify API 客户端
*
* @author 芋道源码
*/
@Component
@Slf4j
public class DifyClient {
@Resource
private DifyProperties difyProperties;
/**
* 调用 Dify 工作流流式 API
*
* @param apiKey Dify API Key
* @param content 用户输入
* @param systemPrompt 系统提示词
* @param conversationId 会话ID可选
* @return 流式响应
*/
public Flux<DifyChatRespVO> chatStream(String apiKey, String content, String systemPrompt, String conversationId) {
String apiUrl = difyProperties.getApiUrl() + "/v1/workflows/run";
// 构建请求体
Map<String, Object> inputs = new HashMap<>();
inputs.put("sysPrompt", systemPrompt);
inputs.put("userInput", content);
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("inputs", inputs);
requestBody.put("response_mode", "streaming");
requestBody.put("user", "user-" + System.currentTimeMillis());
AtomicReference<String> responseConversationId = new AtomicReference<>(conversationId);
StringBuilder fullContent = new StringBuilder();
WebClient webClient = WebClient.builder()
.baseUrl(apiUrl)
.defaultHeader("Authorization", "Bearer " + apiKey)
.defaultHeader("Content-Type", "application/json")
.build();
return webClient.post()
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.map(this::parseSSEEvent)
.filter(resp -> resp != null)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
responseConversationId.set(resp.getConversationId());
}
if (resp.getContent() != null) {
fullContent.append(resp.getContent());
}
})
.doOnComplete(() -> {
log.info("[chatStream] Dify 流式响应完成会话ID: {}, 内容长度: {}",
responseConversationId.get(), fullContent.length());
})
.doOnError(e -> log.error("[chatStream] Dify 流式响应错误", e));
}
/**
* 解析 SSE 事件
*/
private DifyChatRespVO parseSSEEvent(String event) {
if (event == null || event.isEmpty()) {
return null;
}
try {
// 解析 SSE 事件格式
// data: {"event": "message", "answer": "xxx", "conversation_id": "xxx"}
if (event.startsWith("data:")) {
String jsonStr = event.substring(5).trim();
if (jsonStr.isEmpty() || jsonStr.equals("[DONE]")) {
return null;
}
// 简单解析 JSON实际项目建议使用 Jackson/Gson
String eventType = extractJsonValue(jsonStr, "event");
String answer = extractJsonValue(jsonStr, "answer");
String conversationId = extractJsonValue(jsonStr, "conversation_id");
if ("message".equals(eventType) || "agent_message".equals(eventType)) {
return DifyChatRespVO.message(answer, conversationId);
} else if ("workflow_finished".equals(eventType) || "message_end".equals(eventType)) {
return DifyChatRespVO.done(conversationId, null);
} else if ("error".equals(eventType)) {
return DifyChatRespVO.error(answer);
}
}
} catch (Exception e) {
log.warn("[parseSSEEvent] 解析 SSE 事件失败: {}", event, e);
}
return null;
}
/**
* 简单提取 JSON 值
*/
private String extractJsonValue(String json, String key) {
String pattern = "\"" + key + "\"\\s*:\\s*\"";
int start = json.indexOf(pattern);
if (start == -1) {
// 尝试非字符串格式
pattern = "\"" + key + "\"\\s*:\\s*";
start = json.indexOf(pattern);
if (start == -1) return null;
start += pattern.length();
int end = json.indexOf(",", start);
if (end == -1) end = json.indexOf("}", start);
if (end == -1) return null;
return json.substring(start, end).trim();
}
start += pattern.length();
int end = json.indexOf("\"", start);
if (end == -1) return null;
return json.substring(start, end);
}
}

View File

@@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.tik.dify.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Dify 配置属性
*
* @author 芋道源码
*/
@Data
@Component
@ConfigurationProperties(prefix = "yudao.dify")
public class DifyProperties {
/**
* Dify API 地址
*/
private String apiUrl;
/**
* 请求超时时间(秒)
*/
private Integer timeout = 60;
}

View File

@@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.tik.dify.controller;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
import cn.iocoder.yudao.module.tik.dify.service.DifyService;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
/**
* Dify 工作流控制器
*
* @author 芋道源码
*/
@Tag(name = "用户 App - Dify 工作流")
@RestController
@RequestMapping("/api/tik/dify")
public class AppDifyController {
@Resource
private DifyService difyService;
@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "流式聊天")
public Flux<CommonResult<DifyChatRespVO>> chatStream(@Valid @RequestBody DifyChatReqVO reqVO) {
Long loginUserId = SecurityFrameworkUtils.getLoginUserId();
String userId = loginUserId != null ? loginUserId.toString() : "1"; // 默认用户ID
return difyService.chatStream(reqVO, userId)
.map(CommonResult::success);
}
}

View File

@@ -0,0 +1,23 @@
package cn.iocoder.yudao.module.tik.dify.service;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO;
import reactor.core.publisher.Flux;
/**
* Dify 服务接口
*
* @author 芋道源码
*/
public interface DifyService {
/**
* 流式聊天(带积分扣减)
*
* @param reqVO 请求参数
* @param userId 用户ID
* @return 流式响应
*/
Flux<DifyChatRespVO> chatStream(DifyChatReqVO reqVO, String userId);
}

View File

@@ -0,0 +1,139 @@
package cn.iocoder.yudao.module.tik.dify.service;
import cn.iocoder.yudao.module.tik.dify.client.DifyClient;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatReqVO;
import cn.iocoder.yudao.module.tik.dify.vo.DifyChatRespVO;
import cn.iocoder.yudao.module.tik.muye.aiagent.dal.AiAgentDO;
import cn.iocoder.yudao.module.tik.muye.aiagent.service.AiAgentService;
import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO;
import cn.iocoder.yudao.module.tik.muye.points.service.PointsService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Dify 服务实现类
*
* @author 芋道源码
*/
@Service
@Validated
@Slf4j
public class DifyServiceImpl implements DifyService {
/** Dify 平台标识 */
private static final String PLATFORM_DIFY = "dify";
/** Dify 模型类型 */
private static final String MODEL_TYPE_WRITING = "writing";
@Resource
private AiAgentService aiAgentService;
@Resource
private PointsService pointsService;
@Resource
private DifyClient difyClient;
@Override
public Flux<DifyChatRespVO> chatStream(DifyChatReqVO reqVO, String userId) {
// 用于存储预扣记录ID
AtomicLong pendingRecordId = new AtomicLong();
// 用于存储会话ID
AtomicReference<String> conversationIdRef = new AtomicReference<>(reqVO.getConversationId());
return Mono.fromCallable(() -> {
// 1. 获取智能体配置
AiAgentDO agent = aiAgentService.getAiAgent(reqVO.getAgentId());
if (agent == null) {
throw new RuntimeException("智能体不存在");
}
// 2. 获取积分配置
AiModelConfigDO config = pointsService.getConfig(PLATFORM_DIFY, MODEL_TYPE_WRITING);
// 3. 预检积分
pointsService.checkPoints(userId, config.getConsumePoints());
// 4. 创建预扣记录
Long recordId = pointsService.createPendingDeduct(
userId,
config.getConsumePoints(),
"dify_chat",
reqVO.getAgentId().toString()
);
pendingRecordId.set(recordId);
// 5. 返回调用参数
return new DifyChatContext(agent.getSystemPrompt(), config.getApiKey(), config.getConsumePoints());
})
.flatMapMany(context -> {
// 6. 调用 Dify 流式 API
return difyClient.chatStream(
context.apiKey(),
reqVO.getContent(),
context.systemPrompt(),
reqVO.getConversationId()
)
.doOnNext(resp -> {
if (resp.getConversationId() != null) {
conversationIdRef.set(resp.getConversationId());
}
})
// 7. 流结束时确认扣费
.doOnComplete(() -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 流结束确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[chatStream] 确认扣费失败", e);
}
}
})
// 8. 流出错时取消预扣
.doOnError(e -> {
if (pendingRecordId.get() > 0) {
try {
pointsService.cancelPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 流出错取消预扣记录ID: {}", pendingRecordId.get());
} catch (Exception ex) {
log.error("[chatStream] 取消预扣失败", ex);
}
}
})
// 9. 用户取消时确认扣费(已消费的部分)
.doOnCancel(() -> {
if (pendingRecordId.get() > 0) {
try {
// 用户主动取消,仍然扣费(按最低消费)
pointsService.confirmPendingDeduct(pendingRecordId.get());
log.info("[chatStream] 用户取消确认扣费记录ID: {}", pendingRecordId.get());
} catch (Exception e) {
log.error("[chatStream] 用户取消后扣费失败", e);
}
}
});
})
// 10. 在最后添加 done 事件
.concatWith(Mono.defer(() -> {
return Mono.just(DifyChatRespVO.done(conversationIdRef.get(), null));
}))
.onErrorResume(e -> {
log.error("[chatStream] Dify 聊天异常", e);
return Flux.just(DifyChatRespVO.error(e.getMessage()));
});
}
/**
* Dify 聊天上下文
*/
private record DifyChatContext(String systemPrompt, String apiKey, Integer consumePoints) {}
}

View File

@@ -0,0 +1,26 @@
package cn.iocoder.yudao.module.tik.dify.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* Dify 聊天请求 VO
*/
@Schema(description = "Dify 聊天请求")
@Data
public class DifyChatReqVO {
@Schema(description = "智能体ID", requiredMode = Schema.RequiredMode.REQUIRED)
@NotNull(message = "智能体ID不能为空")
private Long agentId;
@Schema(description = "用户输入内容", requiredMode = Schema.RequiredMode.REQUIRED)
@NotEmpty(message = "内容不能为空")
private String content;
@Schema(description = "会话ID可选首次对话不传")
private String conversationId;
}

View File

@@ -0,0 +1,62 @@
package cn.iocoder.yudao.module.tik.dify.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Dify 聊天响应 VO
*/
@Schema(description = "Dify 聊天响应")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DifyChatRespVO {
@Schema(description = "事件类型message/done/error")
private String event;
@Schema(description = "消息内容")
private String content;
@Schema(description = "会话ID")
private String conversationId;
@Schema(description = "消耗积分")
private Integer consumePoints;
@Schema(description = "错误信息")
private String errorMessage;
/** 事件类型常量 */
public static final String EVENT_MESSAGE = "message";
public static final String EVENT_DONE = "done";
public static final String EVENT_ERROR = "error";
public static DifyChatRespVO message(String content, String conversationId) {
return DifyChatRespVO.builder()
.event(EVENT_MESSAGE)
.content(content)
.conversationId(conversationId)
.build();
}
public static DifyChatRespVO done(String conversationId, Integer consumePoints) {
return DifyChatRespVO.builder()
.event(EVENT_DONE)
.conversationId(conversationId)
.consumePoints(consumePoints)
.build();
}
public static DifyChatRespVO error(String errorMessage) {
return DifyChatRespVO.builder()
.event(EVENT_ERROR)
.errorMessage(errorMessage)
.build();
}
}

View File

@@ -0,0 +1,38 @@
package cn.iocoder.yudao.module.tik.muye.points.job;
import cn.iocoder.yudao.module.tik.muye.pointrecord.mapper.PointRecordMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 预扣过期清理定时任务
*
* 功能清理超过30分钟的预扣记录将其状态更新为已取消
* 执行频率每5分钟执行一次
*
* @author 芋道源码
*/
@Component
@Slf4j
public class PointsPendingCleanJob {
@Resource
private PointRecordMapper pointRecordMapper;
/**
* 每5分钟清理一次过期的预扣记录
*/
@Scheduled(fixedRate = 5 * 60 * 1000)
public void cleanExpiredPendingRecords() {
log.info("[cleanExpiredPendingRecords][开始清理过期预扣记录]");
try {
int affectedRows = pointRecordMapper.cancelExpiredPendingRecords();
log.info("[cleanExpiredPendingRecords][清理完成,共取消 {} 条过期预扣记录]", affectedRows);
} catch (Exception e) {
log.error("[cleanExpiredPendingRecords][清理过期预扣记录失败]", e);
}
}
}

View File

@@ -0,0 +1,69 @@
package cn.iocoder.yudao.module.tik.muye.points.service;
import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO;
/**
* 积分扣减公共服务
*
* @author 芋道源码
*/
public interface PointsService {
/**
* 获取积分配置
*
* @param platform 平台标识dify/tikhub/voice/digital_human
* @param modelType 模型类型
* @return 积分配置
*/
AiModelConfigDO getConfig(String platform, String modelType);
/**
* 预检积分(余额不足抛异常)
*
* @param userId 用户ID
* @param points 所需积分
*/
void checkPoints(String userId, Integer points);
/**
* 即时扣减(同步场景)
* 直接扣减积分并记录流水
*
* @param userId 用户ID
* @param points 扣减积分数量
* @param bizType 业务类型
* @param bizId 业务关联ID
* @return 记录ID
*/
Long deductPoints(String userId, Integer points, String bizType, String bizId);
/**
* 创建预扣(流式/异步场景)
* 创建待确认的扣减记录,不实际扣减积分
*
* @param userId 用户ID
* @param points 预扣积分数量
* @param bizType 业务类型
* @param bizId 业务关联ID
* @return 预扣记录ID
*/
Long createPendingDeduct(String userId, Integer points, String bizType, String bizId);
/**
* 确认预扣(实际扣减)
* 执行实际积分扣减,更新预扣记录状态
*
* @param recordId 预扣记录ID
*/
void confirmPendingDeduct(Long recordId);
/**
* 取消预扣(不扣费)
* 更新预扣记录状态为已取消
*
* @param recordId 预扣记录ID
*/
void cancelPendingDeduct(Long recordId);
}

View File

@@ -0,0 +1,179 @@
package cn.iocoder.yudao.module.tik.muye.points.service;
import cn.iocoder.yudao.module.tik.muye.aimodelconfig.dal.AiModelConfigDO;
import cn.iocoder.yudao.module.tik.muye.aimodelconfig.mapper.AiModelConfigMapper;
import cn.iocoder.yudao.module.tik.muye.memberuserprofile.dal.MemberUserProfileDO;
import cn.iocoder.yudao.module.tik.muye.memberuserprofile.mapper.MemberUserProfileMapper;
import cn.iocoder.yudao.module.tik.muye.pointrecord.dal.PointRecordDO;
import cn.iocoder.yudao.module.tik.muye.pointrecord.mapper.PointRecordMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.util.UUID;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.tik.enums.ErrorCodeConstants.*;
/**
* 积分扣减公共服务 实现类
*
* @author 芋道源码
*/
@Service
@Validated
@Slf4j
public class PointsServiceImpl implements PointsService {
/** 预扣状态:待确认 */
private static final String STATUS_PENDING = "pending";
/** 预扣状态:已确认 */
private static final String STATUS_CONFIRMED = "confirmed";
/** 预扣状态:已取消 */
private static final String STATUS_CANCELED = "canceled";
@Resource
private AiModelConfigMapper aiModelConfigMapper;
@Resource
private MemberUserProfileMapper memberUserProfileMapper;
@Resource
private PointRecordMapper pointRecordMapper;
@Override
public AiModelConfigDO getConfig(String platform, String modelType) {
AiModelConfigDO config = aiModelConfigMapper.selectByPlatformAndModelType(platform, modelType);
if (config == null) {
throw exception(POINTS_CONFIG_NOT_FOUND);
}
return config;
}
@Override
public void checkPoints(String userId, Integer points) {
MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId);
if (profile == null || profile.getRemainingPoints() == null || profile.getRemainingPoints() < points) {
throw exception(POINTS_INSUFFICIENT);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public Long deductPoints(String userId, Integer points, String bizType, String bizId) {
// 1. 原子扣减积分
int affectedRows = memberUserProfileMapper.updatePointsDeduct(userId, points);
if (affectedRows == 0) {
throw exception(POINTS_DEDUCT_FAILED);
}
// 2. 查询扣减后余额
MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId);
// 3. 创建积分记录(已确认状态)
PointRecordDO record = PointRecordDO.builder()
.userId(Long.parseLong(userId))
.type("decrease")
.pointAmount(-points)
.balance(profile.getRemainingPoints())
.reason(bizType)
.bizType(bizType)
.bizId(bizId)
.status(STATUS_CONFIRMED)
.build();
pointRecordMapper.insert(record);
log.info("[deductPoints] 用户 {} 扣减积分 {},业务类型 {}记录ID {}", userId, points, bizType, record.getId());
return record.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
public Long createPendingDeduct(String userId, Integer points, String bizType, String bizId) {
// 1. 预检积分
checkPoints(userId, points);
// 2. 查询当前余额
MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId);
// 3. 创建预扣记录(待确认状态)
PointRecordDO record = PointRecordDO.builder()
.userId(Long.parseLong(userId))
.type("decrease")
.pointAmount(-points)
.balance(profile.getRemainingPoints())
.reason(bizType + "(预扣)")
.bizType(bizType)
.bizId(bizId != null ? bizId : UUID.randomUUID().toString())
.status(STATUS_PENDING)
.build();
pointRecordMapper.insert(record);
log.info("[createPendingDeduct] 用户 {} 创建预扣 {} 积分,业务类型 {}记录ID {}",
userId, points, bizType, record.getId());
return record.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmPendingDeduct(Long recordId) {
// 1. 查询预扣记录
PointRecordDO record = pointRecordMapper.selectById(recordId);
if (record == null) {
throw exception(POINTS_PENDING_NOT_FOUND);
}
// 2. 校验状态
if (!STATUS_PENDING.equals(record.getStatus())) {
throw exception(POINTS_PENDING_ALREADY_CONFIRMED);
}
// 3. 获取扣减信息
String userId = record.getUserId().toString();
Integer points = Math.abs(record.getPointAmount());
// 4. 原子扣减积分
int affectedRows = memberUserProfileMapper.updatePointsDeduct(userId, points);
if (affectedRows == 0) {
log.warn("[confirmPendingDeduct] 积分扣减失败可能余额不足记录ID {}", recordId);
throw exception(POINTS_DEDUCT_FAILED);
}
// 5. 查询扣减后余额
MemberUserProfileDO profile = memberUserProfileMapper.selectByUserId(userId);
// 6. 更新预扣记录状态
record.setStatus(STATUS_CONFIRMED);
record.setBalance(profile.getRemainingPoints());
record.setReason(record.getReason().replace("(预扣)", ""));
pointRecordMapper.updateById(record);
log.info("[confirmPendingDeduct] 确认预扣记录 {},用户 {} 扣减 {} 积分",
recordId, userId, points);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelPendingDeduct(Long recordId) {
// 1. 查询预扣记录
PointRecordDO record = pointRecordMapper.selectById(recordId);
if (record == null) {
throw exception(POINTS_PENDING_NOT_FOUND);
}
// 2. 校验状态
if (!STATUS_PENDING.equals(record.getStatus())) {
throw exception(POINTS_PENDING_ALREADY_CONFIRMED);
}
// 3. 更新为已取消状态
record.setStatus(STATUS_CANCELED);
pointRecordMapper.updateById(record);
log.info("[cancelPendingDeduct] 取消预扣记录 {},用户 {},积分 {}",
recordId, record.getUserId(), Math.abs(record.getPointAmount()));
}
}

View File

@@ -0,0 +1,299 @@
# AI 服务积分扣减公共服务 - 实现计划
> 版本: v1.0
> 日期: 2026-02-22
> 基于设计文档: points-service-integration.md
---
## 一、实现概览
### 当前状态分析
| 组件 | 文件路径 | 当前状态 | 需要修改 |
|------|---------|---------|---------|
| PointRecordDO | `muye/pointrecord/dal/PointRecordDO.java` | 完整 | 新增 status 字段 |
| MemberUserProfileMapper | `muye/memberuserprofile/mapper/MemberUserProfileMapper.java` | 仅查询 | 新增原子扣减方法 |
| AiModelConfigMapper | `muye/aimodelconfig/mapper/AiModelConfigMapper.java` | 仅分页 | 新增按平台查询方法 |
| PointsService | 不存在 | **需新建** | 新建接口+实现 |
| DifyService | 不存在 | **需新建** | 新建服务 |
---
## 二、实现步骤
### 步骤 1: 数据库层修改
#### 1.1 PointRecordDO 新增 status 字段
**文件**: `muye/pointrecord/dal/PointRecordDO.java`
```java
// 新增字段
private String status; // 状态pending(预扣) / confirmed(已确认) / canceled(已取消)
```
**数据库迁移**:
```sql
ALTER TABLE muey_point_record ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed' COMMENT '状态pending-预扣 confirmed-已确认 canceled-已取消';
```
#### 1.2 MemberUserProfileMapper 新增方法
**文件**: `muye/memberuserprofile/mapper/MemberUserProfileMapper.java`
```java
/**
* 根据用户ID查询档案
*/
default MemberUserProfileDO selectByUserId(Long userId) {
return selectOne(new LambdaQueryWrapperX<MemberUserProfileDO>()
.eq(MemberUserProfileDO::getUserId, userId));
}
/**
* 原子扣减积分(乐观锁)
* @return 影响行数0表示余额不足
*/
@Update("UPDATE muey_member_user_profile " +
"SET remaining_points = remaining_points - #{points}, " +
" used_points = used_points + #{points}, " +
" update_time = NOW() " +
"WHERE user_id = #{userId} AND remaining_points >= #{points}")
int updatePointsDeduct(@Param("userId") Long userId, @Param("points") Integer points);
```
#### 1.3 AiModelConfigMapper 新增方法
**文件**: `muye/aimodelconfig/mapper/AiModelConfigMapper.java`
```java
/**
* 根据平台和模型类型查询配置
*/
default AiModelConfigDO selectByPlatformAndModelType(String platform, String modelType) {
return selectOne(new LambdaQueryWrapperX<AiModelConfigDO>()
.eq(AiModelConfigDO::getPlatform, platform)
.eq(AiModelConfigDO::getModelType, modelType)
.eq(AiModelConfigDO::getStatus, 1));
}
```
#### 1.4 PointRecordMapper 新增方法
**文件**: `muye/pointrecord/mapper/PointRecordMapper.java`
```java
/**
* 取消过期的预扣记录30分钟前
*/
@Update("UPDATE muey_point_record SET status = 'canceled', update_time = NOW() " +
"WHERE status = 'pending' AND create_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)")
int cancelExpiredPendingRecords();
```
---
### 步骤 2: 公共积分服务
#### 2.1 新建 PointsService 接口
**文件**: `muye/points/service/PointsService.java`(新建)
```java
public interface PointsService {
/**
* 获取积分配置
*/
AiModelConfigDO getConfig(String platform, String modelType);
/**
* 预检积分(余额不足抛异常)
*/
void checkPoints(Long userId, Integer points);
/**
* 即时扣减(同步场景)
*/
Long deductPoints(Long userId, Integer points, String bizType, String bizId);
/**
* 创建预扣(流式/异步场景)
* @return 预扣记录ID
*/
Long createPendingDeduct(Long userId, Integer points, String bizType, String bizId);
/**
* 确认预扣(实际扣减)
*/
void confirmPendingDeduct(Long recordId);
/**
* 取消预扣(不扣费)
*/
void cancelPendingDeduct(Long recordId);
}
```
#### 2.2 新建 PointsServiceImpl 实现
**文件**: `muye/points/service/PointsServiceImpl.java`(新建)
核心逻辑:
- `deductPoints`: 调用 Mapper 原子扣减,失败抛 `POINTS_DEDUCT_FAILED`
- `createPendingDeduct`: 创建 status=pending 的记录
- `confirmPendingDeduct`: 执行实际扣减 + 更新 status=confirmed
- `cancelPendingDeduct`: 更新 status=canceled
#### 2.3 新建错误码常量
**文件**: `ErrorCodeConstants.java`(修改)
```java
// 积分相关错误码 1001001-1001003
ErrorCode POINTS_INSUFFICIENT = new ErrorCode(1001001, "积分不足");
ErrorCode POINTS_CONFIG_NOT_FOUND = new ErrorCode(1001002, "积分配置不存在");
ErrorCode POINTS_DEDUCT_FAILED = new ErrorCode(1001003, "积分扣减失败");
```
---
### 步骤 3: Dify 工作流集成
#### 3.1 新建 Dify 配置类
**文件**: `dify/config/DifyProperties.java`(新建)
```java
@ConfigurationProperties(prefix = "yudao.dify")
public class DifyProperties {
private String apiUrl;
private Integer timeout = 60;
}
```
#### 3.2 新建 DifyClient
**文件**: `dify/client/DifyClient.java`(新建)
- 调用 Dify 工作流 API
- 支持流式响应
- 传入 sysPrompt 参数
#### 3.3 新建 DifyService
**文件**: `dify/service/DifyService.java`(新建)
```java
public interface DifyService {
/**
* 流式聊天(带积分扣减)
*/
Flux<DifyChatRespVO> chatStream(DifyChatReqVO reqVO, Long userId);
}
```
#### 3.4 新建 DifyController
**文件**: `dify/controller/AppDifyController.java`(新建)
```java
@PostMapping("/api/tik/dify/chat/stream")
public Flux<CommonResult<DifyChatRespVO>> chatStream(@RequestBody DifyChatReqVO reqVO);
```
---
### 步骤 4: 集成到现有服务
#### 4.1 AiChatMessageService 集成
**文件**: `service/chat/AiChatMessageServiceImpl.java`(修改)
`sendChatMessageStream` 方法中:
1. 调用前:`pointsService.checkPoints()`
2. 创建预扣:`pointsService.createPendingDeduct()`
3. 流结束:`pointsService.confirmPendingDeduct()`
4. 出错/取消:`pointsService.cancelPendingDeduct()`
---
### 步骤 5: 定时任务
#### 5.1 预扣过期清理任务
**文件**: `job/PointsPendingCleanJob.java`(新建)
- 每 5 分钟执行
- 调用 `pointRecordMapper.cancelExpiredPendingRecords()`
---
## 三、文件清单
### 新建文件
| 文件 | 路径 |
|------|------|
| PointsService | `muye/points/service/PointsService.java` |
| PointsServiceImpl | `muye/points/service/PointsServiceImpl.java` |
| DifyProperties | `dify/config/DifyProperties.java` |
| DifyClient | `dify/client/DifyClient.java` |
| DifyService | `dify/service/DifyService.java` |
| DifyServiceImpl | `dify/service/DifyServiceImpl.java` |
| DifyReqVO | `dify/vo/DifyChatReqVO.java` |
| DifyRespVO | `dify/vo/DifyChatRespVO.java` |
| AppDifyController | `dify/controller/AppDifyController.java` |
| PointsPendingCleanJob | `job/PointsPendingCleanJob.java` |
### 修改文件
| 文件 | 修改内容 |
|------|---------|
| PointRecordDO | 新增 status 字段 |
| MemberUserProfileMapper | 新增 selectByUserId、updatePointsDeduct 方法 |
| AiModelConfigMapper | 新增 selectByPlatformAndModelType 方法 |
| PointRecordMapper | 新增 cancelExpiredPendingRecords 方法 |
| ErrorCodeConstants | 新增积分相关错误码 |
| AiChatMessageServiceImpl | 集成积分扣减逻辑 |
---
## 四、数据库变更
```sql
-- 1. 积分记录表新增状态字段
ALTER TABLE muey_point_record
ADD COLUMN status VARCHAR(20) DEFAULT 'confirmed'
COMMENT '状态pending-预扣 confirmed-已确认 canceled-已取消';
-- 2. 添加索引(可选优化)
CREATE INDEX idx_point_record_status_time
ON muey_point_record(status, create_time);
```
---
## 五、依赖关系
```
业务层
├── DifyService ──────┐
├── AiChatMessageService ──┼──→ PointsService公共服务
├── TikHubService ─────────┤ │
├── VoiceService ──────────┤ ├── AiModelConfigMapper配置查询
└── DigitalHumanService ───┘ ├── MemberUserProfileMapper积分扣减
└── PointRecordMapper流水记录
```
---
## 六、验收标准
- [ ] PointsService 单元测试通过
- [ ] Dify 流式接口正常返回
- [ ] 积分不足时抛出正确异常
- [ ] 流式中断时预扣正确取消
- [ ] 预扣过期定时任务正常运行
- [ ] 积分扣减原子性(并发不超扣)