Files
sionrui/frontend/app/web-gold/src/services/ai-bridge/stream-adapter.ts
2026-01-18 18:36:37 +08:00

286 lines
7.3 KiB
TypeScript

/**
* AI 桥接服务 - 流式适配器
* 负责 SSE 到 AI SDK 协议转换,保持与现有系统的兼容性
*/
import type { ReadableStream } from 'stream/web'
// SSE 事件类型
interface SSEEvent {
data: string
event?: string
id?: string
retry?: number
}
// 流式数据处理配置
interface StreamAdapterConfig {
enableVisibilityOptimization?: boolean
enablePerformanceTracking?: boolean
bufferSize?: number
}
/**
* 流式适配器类
* 将 SSE 响应转换为 AI SDK 兼容的格式
*/
export class StreamAdapter {
private buffer: string = ''
private isVisible: boolean = true
private config: StreamAdapterConfig
private performanceMetrics: {
chunkCount: number
totalBytes: number
startTime: number
}
constructor(config: StreamAdapterConfig = {}) {
this.config = {
enableVisibilityOptimization: true,
enablePerformanceTracking: true,
bufferSize: 1024,
...config
}
this.performanceMetrics = {
chunkCount: 0,
totalBytes: 0,
startTime: Date.now()
}
// 监听页面可见性变化
if (this.config.enableVisibilityOptimization) {
this.setupVisibilityListener()
}
}
/**
* 设置页面可见性监听器
*/
private setupVisibilityListener(): void {
const handleVisibilityChange = () => {
const wasVisible = this.isVisible
this.isVisible = !document.hidden
if (!wasVisible && this.isVisible) {
console.log('[AI Bridge] 页面重新可见,重新连接流')
} else if (wasVisible && !this.isVisible) {
console.log('[AI Bridge] 页面进入后台,优化性能')
}
}
document.addEventListener('visibilitychange', handleVisibilityChange)
}
/**
* 将 SSE ReadableStream 转换为 AI SDK 可消费的异步迭代器
*/
async *convertSSEResponse(sseStream: ReadableStream): AsyncIterable<string> {
console.log('[AI Bridge] 开始转换 SSE 响应')
const reader = sseStream.getReader()
const decoder = new TextDecoder()
let isFirstChunk = true
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
this.buffer += chunk
this.performanceMetrics.totalBytes += chunk.length
// 提取完整的 SSE 事件(以 \n\n 分隔)
const events = this.buffer.split('\n\n')
this.buffer = events.pop() || '' // 保存不完整的事件
for (const event of events) {
if (event.trim()) {
const parsedEvent = this.parseSSEEvent(event)
if (parsedEvent && this.shouldProcessEvent(parsedEvent)) {
const content = this.extractContent(parsedEvent)
if (content) {
yield content
this.performanceMetrics.chunkCount++
}
}
}
}
// 第一块数据特殊处理
if (isFirstChunk) {
console.log('[AI Bridge] 接收到第一块数据')
isFirstChunk = false
}
}
// 处理缓冲区中剩余的数据
if (this.buffer.trim()) {
const lastEvent = this.parseSSEEvent(this.buffer)
if (lastEvent) {
const content = this.extractContent(lastEvent)
if (content) {
yield content
}
}
}
} catch (error) {
console.error('[AI Bridge] SSE 转换错误:', error)
throw error
} finally {
reader.releaseLock()
this.logPerformanceMetrics()
}
}
/**
* 解析 SSE 事件
*/
private parseSSEEvent(event: string): SSEEvent | null {
const lines = event.split('\n')
const parsedEvent: SSEEvent = {
data: ''
}
for (const line of lines) {
if (line.startsWith('data: ')) {
parsedEvent.data = line.substring(6)
} else if (line.startsWith('event: ')) {
parsedEvent.event = line.substring(7)
} else if (line.startsWith('id: ')) {
parsedEvent.id = line.substring(4)
} else if (line.startsWith('retry: ')) {
parsedEvent.retry = parseInt(line.substring(8), 10)
}
}
return parsedEvent.data ? parsedEvent : null
}
/**
* 判断是否应该处理此事件
*/
private shouldProcessEvent(event: SSEEvent): boolean {
// 页面不可见时跳过非关键事件
if (!this.isVisible && this.config.enableVisibilityOptimization) {
return event.event !== 'ping'
}
return true
}
/**
* 从 SSE 事件中提取内容
*/
private extractContent(event: SSEEvent): string | null {
try {
// 尝试解析 JSON 格式
const data = JSON.parse(event.data)
// 处理不同的响应格式
if (data.content) {
return data.content
} else if (data.text) {
return data.text
} else if (data.message?.content) {
return data.message.content
} else if (data.receive?.content) {
return data.receive.content
} else if (data.delta) {
return data.delta
}
// 如果是纯文本数据
if (typeof data === 'string') {
return data
}
return null
} catch {
// 非 JSON 数据直接返回
return event.data
}
}
/**
* 处理流式增量数据
*/
processStreamingDelta(newFullContent: string, previousContent: string = ''): string {
if (newFullContent.startsWith(previousContent)) {
// 正常情况:增量更新
return newFullContent.slice(previousContent.length)
} else {
// 异常情况:内容乱序,返回完整内容
console.warn('[AI Bridge] 流式内容乱序,使用完整内容')
return newFullContent
}
}
/**
* 获取性能指标
*/
getPerformanceMetrics() {
const duration = Date.now() - this.performanceMetrics.startTime
return {
...this.performanceMetrics,
duration,
averageChunkSize: this.performanceMetrics.chunkCount > 0
? this.performanceMetrics.totalBytes / this.performanceMetrics.chunkCount
: 0,
chunksPerSecond: this.performanceMetrics.chunkCount / (duration / 1000)
}
}
/**
* 记录性能指标
*/
private logPerformanceMetrics(): void {
if (this.config.enablePerformanceTracking) {
const metrics = this.getPerformanceMetrics()
console.log('[AI Bridge] 性能指标:', {
总字节数: metrics.totalBytes,
数据块数: metrics.chunkCount,
: `${metrics.duration}ms`,
: `${Math.round(metrics.averageChunkSize)} bytes`,
: `${metrics.chunksPerSecond.toFixed(2)} chunks/s`
})
}
}
/**
* 重置适配器状态
*/
reset(): void {
this.buffer = ''
this.performanceMetrics = {
chunkCount: 0,
totalBytes: 0,
startTime: Date.now()
}
}
/**
* 清理资源
*/
dispose(): void {
this.buffer = ''
this.reset()
// 移除事件监听器(在实际实现中需要保存引用以便移除)
}
}
/**
* 创建流式适配器实例的工厂函数
*/
export function createStreamAdapter(config?: StreamAdapterConfig): StreamAdapter {
return new StreamAdapter(config)
}
/**
* 默认配置
*/
export const DEFAULT_STREAM_CONFIG: StreamAdapterConfig = {
enableVisibilityOptimization: true,
enablePerformanceTracking: true,
bufferSize: 1024
}