feat: 优化
This commit is contained in:
285
frontend/app/web-gold/src/services/ai-bridge/stream-adapter.ts
Normal file
285
frontend/app/web-gold/src/services/ai-bridge/stream-adapter.ts
Normal file
@@ -0,0 +1,285 @@
|
||||
/**
|
||||
* AI 桥接服务 - 流式适配器
|
||||
* 负责 SSE 到 AI SDK 协议转换,保持与现有系统的兼容性
|
||||
*/
|
||||
|
||||
import type { ReadableStream } from 'stream/web'
|
||||
|
||||
// SSE 事件类型
|
||||
interface SSEEvent {
|
||||
data: string
|
||||
event?: string
|
||||
id?: string
|
||||
retry?: number
|
||||
}
|
||||
|
||||
// 流式数据处理配置
|
||||
interface StreamAdapterConfig {
|
||||
enableVisibilityOptimization?: boolean
|
||||
enablePerformanceTracking?: boolean
|
||||
bufferSize?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* 流式适配器类
|
||||
* 将 SSE 响应转换为 AI SDK 兼容的格式
|
||||
*/
|
||||
export class StreamAdapter {
|
||||
private buffer: string = ''
|
||||
private isVisible: boolean = true
|
||||
private config: StreamAdapterConfig
|
||||
private performanceMetrics: {
|
||||
chunkCount: number
|
||||
totalBytes: number
|
||||
startTime: number
|
||||
}
|
||||
|
||||
constructor(config: StreamAdapterConfig = {}) {
|
||||
this.config = {
|
||||
enableVisibilityOptimization: true,
|
||||
enablePerformanceTracking: true,
|
||||
bufferSize: 1024,
|
||||
...config
|
||||
}
|
||||
this.performanceMetrics = {
|
||||
chunkCount: 0,
|
||||
totalBytes: 0,
|
||||
startTime: Date.now()
|
||||
}
|
||||
|
||||
// 监听页面可见性变化
|
||||
if (this.config.enableVisibilityOptimization) {
|
||||
this.setupVisibilityListener()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置页面可见性监听器
|
||||
*/
|
||||
private setupVisibilityListener(): void {
|
||||
const handleVisibilityChange = () => {
|
||||
const wasVisible = this.isVisible
|
||||
this.isVisible = !document.hidden
|
||||
|
||||
if (!wasVisible && this.isVisible) {
|
||||
console.log('[AI Bridge] 页面重新可见,重新连接流')
|
||||
} else if (wasVisible && !this.isVisible) {
|
||||
console.log('[AI Bridge] 页面进入后台,优化性能')
|
||||
}
|
||||
}
|
||||
|
||||
document.addEventListener('visibilitychange', handleVisibilityChange)
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 SSE ReadableStream 转换为 AI SDK 可消费的异步迭代器
|
||||
*/
|
||||
async *convertSSEResponse(sseStream: ReadableStream): AsyncIterable<string> {
|
||||
console.log('[AI Bridge] 开始转换 SSE 响应')
|
||||
|
||||
const reader = sseStream.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let isFirstChunk = true
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
const chunk = decoder.decode(value, { stream: true })
|
||||
this.buffer += chunk
|
||||
this.performanceMetrics.totalBytes += chunk.length
|
||||
|
||||
// 提取完整的 SSE 事件(以 \n\n 分隔)
|
||||
const events = this.buffer.split('\n\n')
|
||||
this.buffer = events.pop() || '' // 保存不完整的事件
|
||||
|
||||
for (const event of events) {
|
||||
if (event.trim()) {
|
||||
const parsedEvent = this.parseSSEEvent(event)
|
||||
if (parsedEvent && this.shouldProcessEvent(parsedEvent)) {
|
||||
const content = this.extractContent(parsedEvent)
|
||||
if (content) {
|
||||
yield content
|
||||
this.performanceMetrics.chunkCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 第一块数据特殊处理
|
||||
if (isFirstChunk) {
|
||||
console.log('[AI Bridge] 接收到第一块数据')
|
||||
isFirstChunk = false
|
||||
}
|
||||
}
|
||||
|
||||
// 处理缓冲区中剩余的数据
|
||||
if (this.buffer.trim()) {
|
||||
const lastEvent = this.parseSSEEvent(this.buffer)
|
||||
if (lastEvent) {
|
||||
const content = this.extractContent(lastEvent)
|
||||
if (content) {
|
||||
yield content
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[AI Bridge] SSE 转换错误:', error)
|
||||
throw error
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
this.logPerformanceMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析 SSE 事件
|
||||
*/
|
||||
private parseSSEEvent(event: string): SSEEvent | null {
|
||||
const lines = event.split('\n')
|
||||
const parsedEvent: SSEEvent = {
|
||||
data: ''
|
||||
}
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('data: ')) {
|
||||
parsedEvent.data = line.substring(6)
|
||||
} else if (line.startsWith('event: ')) {
|
||||
parsedEvent.event = line.substring(7)
|
||||
} else if (line.startsWith('id: ')) {
|
||||
parsedEvent.id = line.substring(4)
|
||||
} else if (line.startsWith('retry: ')) {
|
||||
parsedEvent.retry = parseInt(line.substring(8), 10)
|
||||
}
|
||||
}
|
||||
|
||||
return parsedEvent.data ? parsedEvent : null
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否应该处理此事件
|
||||
*/
|
||||
private shouldProcessEvent(event: SSEEvent): boolean {
|
||||
// 页面不可见时跳过非关键事件
|
||||
if (!this.isVisible && this.config.enableVisibilityOptimization) {
|
||||
return event.event !== 'ping'
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 SSE 事件中提取内容
|
||||
*/
|
||||
private extractContent(event: SSEEvent): string | null {
|
||||
try {
|
||||
// 尝试解析 JSON 格式
|
||||
const data = JSON.parse(event.data)
|
||||
|
||||
// 处理不同的响应格式
|
||||
if (data.content) {
|
||||
return data.content
|
||||
} else if (data.text) {
|
||||
return data.text
|
||||
} else if (data.message?.content) {
|
||||
return data.message.content
|
||||
} else if (data.receive?.content) {
|
||||
return data.receive.content
|
||||
} else if (data.delta) {
|
||||
return data.delta
|
||||
}
|
||||
|
||||
// 如果是纯文本数据
|
||||
if (typeof data === 'string') {
|
||||
return data
|
||||
}
|
||||
|
||||
return null
|
||||
} catch {
|
||||
// 非 JSON 数据直接返回
|
||||
return event.data
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理流式增量数据
|
||||
*/
|
||||
processStreamingDelta(newFullContent: string, previousContent: string = ''): string {
|
||||
if (newFullContent.startsWith(previousContent)) {
|
||||
// 正常情况:增量更新
|
||||
return newFullContent.slice(previousContent.length)
|
||||
} else {
|
||||
// 异常情况:内容乱序,返回完整内容
|
||||
console.warn('[AI Bridge] 流式内容乱序,使用完整内容')
|
||||
return newFullContent
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取性能指标
|
||||
*/
|
||||
getPerformanceMetrics() {
|
||||
const duration = Date.now() - this.performanceMetrics.startTime
|
||||
return {
|
||||
...this.performanceMetrics,
|
||||
duration,
|
||||
averageChunkSize: this.performanceMetrics.chunkCount > 0
|
||||
? this.performanceMetrics.totalBytes / this.performanceMetrics.chunkCount
|
||||
: 0,
|
||||
chunksPerSecond: this.performanceMetrics.chunkCount / (duration / 1000)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录性能指标
|
||||
*/
|
||||
private logPerformanceMetrics(): void {
|
||||
if (this.config.enablePerformanceTracking) {
|
||||
const metrics = this.getPerformanceMetrics()
|
||||
console.log('[AI Bridge] 性能指标:', {
|
||||
总字节数: metrics.totalBytes,
|
||||
数据块数: metrics.chunkCount,
|
||||
持续时间: `${metrics.duration}ms`,
|
||||
平均块大小: `${Math.round(metrics.averageChunkSize)} bytes`,
|
||||
处理速度: `${metrics.chunksPerSecond.toFixed(2)} chunks/s`
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置适配器状态
|
||||
*/
|
||||
reset(): void {
|
||||
this.buffer = ''
|
||||
this.performanceMetrics = {
|
||||
chunkCount: 0,
|
||||
totalBytes: 0,
|
||||
startTime: Date.now()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理资源
|
||||
*/
|
||||
dispose(): void {
|
||||
this.buffer = ''
|
||||
this.reset()
|
||||
// 移除事件监听器(在实际实现中需要保存引用以便移除)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建流式适配器实例的工厂函数
|
||||
*/
|
||||
export function createStreamAdapter(config?: StreamAdapterConfig): StreamAdapter {
|
||||
return new StreamAdapter(config)
|
||||
}
|
||||
|
||||
/**
|
||||
* 默认配置
|
||||
*/
|
||||
export const DEFAULT_STREAM_CONFIG: StreamAdapterConfig = {
|
||||
enableVisibilityOptimization: true,
|
||||
enablePerformanceTracking: true,
|
||||
bufferSize: 1024
|
||||
}
|
||||
Reference in New Issue
Block a user