import { WebSocket } from 'ws'; import { randomUUID } from 'crypto'; import { getDb } from '../db'; import { videoAgent } from '../agent'; import type { MessageParam, ToolUseBlock, TextBlock } from '@anthropic-ai/sdk/resources/messages.mjs'; interface ChatMsg { type: string; conversationId?: string; content?: string; title?: string; accountId?: string; data?: Record; conversation_id?: string; role?: string; tool_calls?: string; created_at?: string; id?: string; } interface DbMessage { id: string; conversation_id: string; role: string; content: string; tool_calls: string | null; created_at: string; } function dbToAnthropic(msg: DbMessage): MessageParam { if (msg.role === 'user') { return { role: 'user', content: msg.content }; } if (msg.role === 'assistant') { if (msg.tool_calls) { try { const parsed = JSON.parse(msg.tool_calls); return { role: 'assistant', content: parsed }; } catch { return { role: 'assistant', content: msg.content }; } } return { role: 'assistant', content: msg.content }; } if (msg.role === 'tool') { try { const { tool_use_id, content } = JSON.parse(msg.content); return { role: 'user', content: [{ type: 'tool_result', tool_use_id, content }], }; } catch { return { role: 'user', content: msg.content }; } } return { role: 'user', content: msg.content }; } export function handleChat(ws: WebSocket) { let conversationId: string | null = null; ws.on('message', async (raw) => { try { const msg: ChatMsg = JSON.parse(raw.toString()); // --- Init: load conversation history --- if (msg.type === 'init') { conversationId = msg.conversationId || randomUUID(); const history = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at' ).all(conversationId) as DbMessage[]; ws.send(JSON.stringify({ type: 'history', data: { conversationId, messages: history } })); return; } // --- Create conversation --- if (msg.type === 'create_conversation') { const { title, accountId } = msg; conversationId = randomUUID(); getDb().prepare( 'INSERT INTO conversations (id, title, account_id) VALUES (?, ?, ?)' ).run(conversationId, title || '新对话', accountId || null); ws.send(JSON.stringify({ type: 'conversation_created', data: { id: conversationId, title } })); return; } // --- Chat with LLM --- if (msg.type === 'chat') { await handleChatMessage(ws, conversationId!, msg.content!); } } catch (e) { console.error('WebSocket error:', e); ws.send(JSON.stringify({ type: 'error', data: { message: (e as Error).message } })); } }); ws.on('close', () => {}); } async function handleChatMessage(ws: WebSocket, convId: string, content: string) { // 1. Save user message const userMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(userMsgId, convId, 'user', content); ws.send(JSON.stringify({ type: 'message', data: { id: userMsgId, role: 'user', content } })); // Update conversation title if first message const msgCount = getDb().prepare( 'SELECT COUNT(*) as count FROM messages WHERE conversation_id = ?' ).get(convId) as { count: number }; if (msgCount.count <= 1) { const title = content.slice(0, 30) + (content.length > 30 ? '...' : ''); getDb().prepare('UPDATE conversations SET title = ?, updated_at = datetime(\'now\') WHERE id = ?') .run(title, convId); } // Update conversation timestamp getDb().prepare('UPDATE conversations SET updated_at = datetime(\'now\') WHERE id = ?').run(convId); // 2. Build message history for Anthropic const history = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? AND id != ? ORDER BY created_at' ).all(convId, userMsgId) as DbMessage[]; const messages: MessageParam[] = history.map(dbToAnthropic); // 3. Call LLM with tool loop const client = videoAgent.getClient(); const model = videoAgent.getModel(); const systemPrompt = videoAgent.getSystemPrompt(); ws.send(JSON.stringify({ type: 'status', data: { status: 'thinking' } })); try { let currentMessages = messages; let maxLoops = 10; while (maxLoops-- > 0) { const stream = client.messages.stream({ model, max_tokens: 4096, system: systemPrompt, tools: videoAgent.getAnthropicTools(), messages: currentMessages, }); let assistantContent = ''; let toolUseBlocks: { id: string; name: string; input: Record }[] = []; const assistantMsgId = randomUUID(); // Stream text ws.send(JSON.stringify({ type: 'message_start', data: { id: assistantMsgId } })); for await (const event of stream) { if (event.type === 'content_block_delta') { if (event.delta.type === 'text_delta') { assistantContent += event.delta.text; ws.send(JSON.stringify({ type: 'text_delta', data: { id: assistantMsgId, text: event.delta.text }, })); } if (event.delta.type === 'input_json_delta') { // Accumulating tool input — handled by SDK internally } } if (event.type === 'content_block_start') { if (event.content_block.type === 'tool_use') { toolUseBlocks.push({ id: event.content_block.id, name: event.content_block.name, input: (event.content_block.input || {}) as Record, }); } } } const finalMsg = await stream.finalMessage(); ws.send(JSON.stringify({ type: 'message_end', data: { id: assistantMsgId } })); // Extract tool uses from final message const toolUses: { id: string; name: string; input: Record }[] = []; const textBlocks: string[] = []; for (const block of finalMsg.content) { if (block.type === 'text') { textBlocks.push(block.text); } if (block.type === 'tool_use') { toolUses.push({ id: block.id, name: block.name, input: block.input as Record }); } } // No tool calls — save assistant message and done if (toolUses.length === 0) { const finalText = textBlocks.join(''); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', finalText); return; } // Has tool calls — save assistant message with tool_calls, execute tools, add results getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content, tool_calls) VALUES (?, ?, ?, ?, ?)' ).run(assistantMsgId, convId, 'assistant', textBlocks.join('') || '(调用工具)', JSON.stringify(finalMsg.content)); // Build assistant content blocks for Anthropic const assistantBlocks: (TextBlock | ToolUseBlock)[] = finalMsg.content .filter((b): b is TextBlock | ToolUseBlock => b.type === 'text' || b.type === 'tool_use'); currentMessages.push({ role: 'assistant', content: assistantBlocks }); // Execute tools and send results const toolResults: { type: 'tool_result'; tool_use_id: string; content: string }[] = []; for (const tool of toolUses) { ws.send(JSON.stringify({ type: 'tool_start', data: { tool: tool.name, input: tool.input }, })); try { const result = await videoAgent.executeTool(tool.name, tool.input); toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: result }); // Save tool result to DB const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: result })); ws.send(JSON.stringify({ type: 'tool_result', data: { tool: tool.name, result: result.slice(0, 1000) }, })); } catch (err) { const errMsg = (err as Error).message; toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: `Error: ${errMsg}` }); const toolMsgId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: `Error: ${errMsg}` })); ws.send(JSON.stringify({ type: 'tool_error', data: { tool: tool.name, error: errMsg }, })); } } // Add tool results to conversation currentMessages.push({ role: 'user', content: toolResults, }); // Continue loop — LLM will process tool results and possibly call more tools or give final response } } catch (err) { const errMsg = (err as Error).message; console.error('LLM error:', errMsg); const errId = randomUUID(); getDb().prepare( 'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)' ).run(errId, convId, 'assistant', `抱歉,出错了:${errMsg}`); ws.send(JSON.stringify({ type: 'message', data: { id: errId, role: 'assistant', content: `抱歉,出错了:${errMsg}` }, })); } }