import { WebSocket } from 'ws'; import { randomUUID } from 'crypto'; import { getDb } from '../db'; import { videoAgent } from '../agent'; import type Anthropic from '@anthropic-ai/sdk'; import type OpenAI from 'openai'; type MessageParam = Anthropic.MessageParam; type ContentBlock = Anthropic.ContentBlock; interface DbMessage { id: string; conversation_id: string; role: string; content: string; tool_calls: string | null; created_at: string; } function filterContent(blocks: ContentBlock[]): ContentBlock[] { return blocks.filter((b) => b.type === 'text' || b.type === 'tool_use'); } function dbToAnthropic(msg: DbMessage): MessageParam { if (msg.role === 'user') { return { role: 'user', content: msg.content }; } if (msg.role === 'assistant') { if (msg.tool_calls) { try { const parsed = JSON.parse(msg.tool_calls) as ContentBlock[]; return { role: 'assistant', content: filterContent(parsed) }; } catch { return { role: 'assistant', content: msg.content }; } } return { role: 'assistant', content: msg.content }; } if (msg.role === 'tool') { try { const { tool_use_id, content } = JSON.parse(msg.content); return { role: 'user', content: [{ type: 'tool_result' as const, tool_use_id, content }], }; } catch { return { role: 'user', content: msg.content }; } } return { role: 'user', content: msg.content }; } export function handleChat(ws: WebSocket) { let conversationId: string | null = null; ws.on('message', async (raw) => { try { const msg = JSON.parse(raw.toString()); if (msg.type === 'init') { conversationId = msg.conversationId || randomUUID(); const history = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at' ).all(conversationId) as DbMessage[]; ws.send(JSON.stringify({ type: 'history', data: { conversationId, messages: history } })); return; } if (msg.type === 'create_conversation') { const { title, accountId } = msg; conversationId = randomUUID(); getDb().prepare( 'INSERT INTO conversations (id, title, account_id) VALUES (?, ?, ?)' ).run(conversationId, title || '新对话', accountId || null); ws.send(JSON.stringify({ type: 'conversation_created', data: { id: conversationId, title } })); return; } if (msg.type === 'chat') { if (!conversationId) { ws.send(JSON.stringify({ type: 'error', data: { message: '没有活跃对话,请先创建或选择一个对话' } })); return; } await handleChatMessage(ws, conversationId, msg.content); } } catch (e) { console.error('WebSocket error:', e); ws.send(JSON.stringify({ type: 'error', data: { message: (e as Error).message } })); } }); ws.on('close', () => {}); } // Helper: convert DB messages to OpenAI format function extractToolCalls(blocks: ContentBlock[]): OpenAI.ChatCompletionMessageToolCall[] { return blocks .filter((b) => b.type === 'tool_use') .map((b) => ({ id: (b as { id: string }).id, type: 'function' as const, function: { name: (b as { name: string }).name, arguments: JSON.stringify((b as { input: unknown }).input), }, })); } function dbToOpenAI(msg: DbMessage): OpenAI.ChatCompletionMessageParam { if (msg.role === 'user') { return { role: 'user', content: msg.content }; } if (msg.role === 'assistant') { const result: Record = { role: 'assistant', content: msg.content || null }; if (!msg.tool_calls) return result as unknown as OpenAI.ChatCompletionMessageParam; let parsed: unknown; try { parsed = JSON.parse(msg.tool_calls); } catch { return result as unknown as OpenAI.ChatCompletionMessageParam; } // Legacy format: parsed is ContentBlock[] (array) if (Array.isArray(parsed)) { const toolCalls = extractToolCalls(parsed); if (toolCalls.length > 0) { result.tool_calls = toolCalls; const textBlocks = parsed.filter((b) => b.type === 'text'); result.content = textBlocks.map((b) => (b as { text: string }).text).join('') || null; } return result as unknown as OpenAI.ChatCompletionMessageParam; } // New format: parsed is { reasoning_content?, content_blocks? } const meta = parsed as { reasoning_content?: string; content_blocks?: ContentBlock[] }; if (meta.reasoning_content) { result.reasoning_content = meta.reasoning_content; } if (meta.content_blocks) { const toolCalls = extractToolCalls(meta.content_blocks); if (toolCalls.length > 0) { result.tool_calls = toolCalls; const textBlocks = meta.content_blocks.filter((b) => b.type === 'text'); result.content = textBlocks.map((b) => (b as { text: string }).text).join('') || null; } } return result as unknown as OpenAI.ChatCompletionMessageParam; } if (msg.role === 'tool') { try { const { tool_use_id, content } = JSON.parse(msg.content); return { role: 'tool', tool_call_id: tool_use_id, content }; } catch { return { role: 'tool', tool_call_id: 'unknown', content: msg.content }; } } return { role: 'user', content: msg.content }; } // --- Anthropic protocol streaming --- async function streamAnthropic( ws: WebSocket, convId: string, messages: MessageParam[], ): Promise { const client = videoAgent.getAnthropicClient(); const model = videoAgent.getModel(); const systemPrompt = videoAgent.getSystemPrompt(); let currentMessages = messages; let maxLoops = 10; while (maxLoops-- > 0) { console.log(`[chat:anthropic] Loop ${9 - maxLoops}, messages: ${currentMessages.length}`); const stream = client.messages.stream({ model, max_tokens: 4096, system: systemPrompt, tools: videoAgent.getAnthropicTools(), messages: currentMessages, }); const assistantMsgId = randomUUID(); ws.send(JSON.stringify({ type: 'message_start', data: { id: assistantMsgId } })); for await (const event of stream) { if (event.type === 'content_block_delta') { if (event.delta.type === 'text_delta') { ws.send(JSON.stringify({ type: 'text_delta', data: { id: assistantMsgId, text: event.delta.text }, })); } } } const finalMsg = await stream.finalMessage(); ws.send(JSON.stringify({ type: 'message_end', data: { id: assistantMsgId } })); const toolUses = finalMsg.content.filter((b): b is Anthropic.ToolUseBlock => b.type === 'tool_use'); const textBlocks = finalMsg.content.filter((b): b is Anthropic.TextBlock => b.type === 'text'); const finalText = textBlocks.map((b) => b.text).join(''); if (toolUses.length === 0) { getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', finalText); return; } // Save assistant with tool calls const cleanContent = filterContent(finalMsg.content as ContentBlock[]); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content, tool_calls) VALUES (?, ?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', finalText || '(调用工具)', JSON.stringify(cleanContent)); currentMessages.push({ role: 'assistant', content: cleanContent }); // Execute tools const toolResults: Anthropic.ToolResultBlockParam[] = []; for (const tool of toolUses) { ws.send(JSON.stringify({ type: 'tool_start', data: { tool: tool.name, input: tool.input } })); console.log(`[chat:anthropic] Executing tool: ${tool.name}`); try { const result = await videoAgent.executeTool(tool.name, tool.input as Record); toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: result }); const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: result })); ws.send(JSON.stringify({ type: 'tool_result', data: { tool: tool.name, result: result.slice(0, 1000) } })); } catch (err) { const errMsg = (err as Error).message; toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: `Error: ${errMsg}` }); const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: `Error: ${errMsg}` })); ws.send(JSON.stringify({ type: 'tool_error', data: { tool: tool.name, error: errMsg } })); } } currentMessages.push({ role: 'user', content: toolResults }); } } // --- OpenAI protocol streaming --- async function streamOpenAI( ws: WebSocket, convId: string, dbMessages: DbMessage[], ): Promise { const client = videoAgent.getOpenAIClient(); const model = videoAgent.getModel(); const systemPrompt = videoAgent.getSystemPrompt(); const openaiTools = videoAgent.getOpenAITools(); let currentDbMessages = [...dbMessages]; let maxLoops = 10; while (maxLoops-- > 0) { const openaiMessages: OpenAI.ChatCompletionMessageParam[] = [ { role: 'system', content: systemPrompt }, ...currentDbMessages.map(dbToOpenAI), ]; console.log(`[chat:openai] Loop ${9 - maxLoops}, messages: ${openaiMessages.length}`); const assistantMsgId = randomUUID(); ws.send(JSON.stringify({ type: 'message_start', data: { id: assistantMsgId } })); let fullText = ''; let reasoningContent = ''; let toolCallsAcc: Array<{ id: string; name: string; arguments: string }> = []; const stream = await client.chat.completions.create({ model, messages: openaiMessages, tools: openaiTools.length > 0 ? openaiTools : undefined, stream: true, }); for await (const chunk of stream) { const delta = chunk.choices[0]?.delta; if (delta?.content) { fullText += delta.content; ws.send(JSON.stringify({ type: 'text_delta', data: { id: assistantMsgId, text: delta.content }, })); } // DeepSeek thinking mode: capture reasoning_content if ((delta as Record)?.reasoning_content) { const chunk = (delta as Record).reasoning_content as string; reasoningContent += chunk; ws.send(JSON.stringify({ type: 'reasoning_delta', data: { id: assistantMsgId, text: chunk }, })); } if (delta?.tool_calls) { for (const tc of delta.tool_calls) { if (tc.index !== undefined) { while (toolCallsAcc.length <= tc.index) { toolCallsAcc.push({ id: '', name: '', arguments: '' }); } if (tc.id) toolCallsAcc[tc.index].id = tc.id; if (tc.function?.name) toolCallsAcc[tc.index].name = tc.function.name; if (tc.function?.arguments) toolCallsAcc[tc.index].arguments += tc.function.arguments; } } } } ws.send(JSON.stringify({ type: 'message_end', data: { id: assistantMsgId } })); // Store extra metadata (reasoning_content, tool_calls) in tool_calls column as JSON const extraMeta: Record = {}; if (reasoningContent) extraMeta.reasoning_content = reasoningContent; // No tool calls — save and done if (toolCallsAcc.length === 0) { getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content, tool_calls) VALUES (?, ?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', fullText, Object.keys(extraMeta).length > 0 ? JSON.stringify(extraMeta) : null); return; } // Save assistant with tool calls in Anthropic-compatible format for DB const dbToolCalls = toolCallsAcc.map((tc) => ({ type: 'tool_use', id: tc.id, name: tc.name, input: JSON.parse(tc.arguments || '{}'), })); const cleanContent = fullText ? [{ type: 'text', text: fullText }, ...dbToolCalls] : dbToolCalls; extraMeta.content_blocks = cleanContent; getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content, tool_calls) VALUES (?, ?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', fullText || '(调用工具)', JSON.stringify(extraMeta)); // Execute tools and collect results for (const tc of toolCallsAcc) { ws.send(JSON.stringify({ type: 'tool_start', data: { tool: tc.name, input: JSON.parse(tc.arguments || '{}') } })); console.log(`[chat:openai] Executing tool: ${tc.name}`); try { const params = JSON.parse(tc.arguments || '{}'); const result = await videoAgent.executeTool(tc.name, params); const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tc.id, content: result })); ws.send(JSON.stringify({ type: 'tool_result', data: { tool: tc.name, result: result.slice(0, 1000) } })); } catch (err) { const errMsg = (err as Error).message; const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tc.id, content: `Error: ${errMsg}` })); ws.send(JSON.stringify({ type: 'tool_error', data: { tool: tc.name, error: errMsg } })); } } // Reload all messages for next loop currentDbMessages = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at' ).all(convId) as DbMessage[]; } } async function handleChatMessage(ws: WebSocket, convId: string, content: string) { const userMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(userMsgId, convId, 'user', content); ws.send(JSON.stringify({ type: 'message', data: { id: userMsgId, role: 'user', content } })); const msgCount = getDb().prepare( 'SELECT COUNT(*) as count FROM messages WHERE conversation_id = ?' ).get(convId) as { count: number }; if (msgCount.count <= 1) { const title = content.slice(0, 30) + (content.length > 30 ? '...' : ''); getDb().prepare('UPDATE conversations SET title = ?, updated_at = datetime(\'now\') WHERE id = ?') .run(title, convId); } getDb().prepare('UPDATE conversations SET updated_at = datetime(\'now\') WHERE id = ?').run(convId); const history = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? AND id != ? ORDER BY created_at' ).all(convId, userMsgId) as DbMessage[]; ws.send(JSON.stringify({ type: 'status', data: { status: 'thinking' } })); try { const protocol = videoAgent.getProtocol(); if (protocol === 'openai') { // OpenAI protocol await streamOpenAI(ws, convId, history); } else { // Anthropic protocol (default) const messages: MessageParam[] = history.map(dbToAnthropic); await streamAnthropic(ws, convId, messages); } } catch (err) { const errMsg = (err as Error).message; console.error('[chat] LLM error:', errMsg); const errId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(errId, convId, 'assistant', `抱歉,出错了:${errMsg}`); ws.send(JSON.stringify({ type: 'message', data: { id: errId, role: 'assistant', content: `抱歉,出错了:${errMsg}` }, })); } }