Files
video-create/web/server/ws/chat.ts

229 lines
8.2 KiB
TypeScript
Raw Normal View History

import { WebSocket } from 'ws';
import { randomUUID } from 'crypto';
import { getDb } from '../db';
import { videoAgent } from '../agent';
import type Anthropic from '@anthropic-ai/sdk';
type MessageParam = Anthropic.MessageParam;
type ContentBlock = Anthropic.ContentBlock;
interface DbMessage {
id: string;
conversation_id: string;
role: string;
content: string;
tool_calls: string | null;
created_at: string;
}
function filterContent(blocks: ContentBlock[]): ContentBlock[] {
return blocks.filter((b) => b.type === 'text' || b.type === 'tool_use');
}
function dbToAnthropic(msg: DbMessage): MessageParam {
if (msg.role === 'user') {
return { role: 'user', content: msg.content };
}
if (msg.role === 'assistant') {
if (msg.tool_calls) {
try {
const parsed = JSON.parse(msg.tool_calls) as ContentBlock[];
return { role: 'assistant', content: filterContent(parsed) };
} catch {
return { role: 'assistant', content: msg.content };
}
}
return { role: 'assistant', content: msg.content };
}
if (msg.role === 'tool') {
try {
const { tool_use_id, content } = JSON.parse(msg.content);
return {
role: 'user',
content: [{ type: 'tool_result' as const, tool_use_id, content }],
};
} catch {
return { role: 'user', content: msg.content };
}
}
return { role: 'user', content: msg.content };
}
export function handleChat(ws: WebSocket) {
let conversationId: string | null = null;
ws.on('message', async (raw) => {
try {
const msg = JSON.parse(raw.toString());
if (msg.type === 'init') {
conversationId = msg.conversationId || randomUUID();
const history = getDb().prepare(
'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at'
).all(conversationId) as DbMessage[];
ws.send(JSON.stringify({ type: 'history', data: { conversationId, messages: history } }));
return;
}
if (msg.type === 'create_conversation') {
const { title, accountId } = msg;
conversationId = randomUUID();
getDb().prepare(
'INSERT INTO conversations (id, title, account_id) VALUES (?, ?, ?)'
).run(conversationId, title || '新对话', accountId || null);
ws.send(JSON.stringify({ type: 'conversation_created', data: { id: conversationId, title } }));
return;
}
if (msg.type === 'chat') {
if (!conversationId) {
ws.send(JSON.stringify({ type: 'error', data: { message: '没有活跃对话,请先创建或选择一个对话' } }));
return;
}
await handleChatMessage(ws, conversationId, msg.content);
}
} catch (e) {
console.error('WebSocket error:', e);
ws.send(JSON.stringify({ type: 'error', data: { message: (e as Error).message } }));
}
});
ws.on('close', () => {});
}
async function handleChatMessage(ws: WebSocket, convId: string, content: string) {
const userMsgId = randomUUID();
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)'
).run(userMsgId, convId, 'user', content);
ws.send(JSON.stringify({ type: 'message', data: { id: userMsgId, role: 'user', content } }));
const msgCount = getDb().prepare(
'SELECT COUNT(*) as count FROM messages WHERE conversation_id = ?'
).get(convId) as { count: number };
if (msgCount.count <= 1) {
const title = content.slice(0, 30) + (content.length > 30 ? '...' : '');
getDb().prepare('UPDATE conversations SET title = ?, updated_at = datetime(\'now\') WHERE id = ?')
.run(title, convId);
}
getDb().prepare('UPDATE conversations SET updated_at = datetime(\'now\') WHERE id = ?').run(convId);
const history = getDb().prepare(
'SELECT * FROM messages WHERE conversation_id = ? AND id != ? ORDER BY created_at'
).all(convId, userMsgId) as DbMessage[];
const messages: MessageParam[] = history.map(dbToAnthropic);
const client = videoAgent.getClient();
const model = videoAgent.getModel();
const systemPrompt = videoAgent.getSystemPrompt();
ws.send(JSON.stringify({ type: 'status', data: { status: 'thinking' } }));
try {
let currentMessages = messages;
let maxLoops = 10;
while (maxLoops-- > 0) {
console.log(`[chat] Calling LLM, loop ${9 - maxLoops}, messages: ${currentMessages.length}`);
const stream = client.messages.stream({
model,
max_tokens: 4096,
system: systemPrompt,
tools: videoAgent.getAnthropicTools(),
messages: currentMessages,
});
const assistantMsgId = randomUUID();
ws.send(JSON.stringify({ type: 'message_start', data: { id: assistantMsgId } }));
for await (const event of stream) {
if (event.type === 'content_block_delta') {
if (event.delta.type === 'text_delta') {
ws.send(JSON.stringify({
type: 'text_delta',
data: { id: assistantMsgId, text: event.delta.text },
}));
}
}
}
const finalMsg = await stream.finalMessage();
ws.send(JSON.stringify({ type: 'message_end', data: { id: assistantMsgId } }));
const toolUses = finalMsg.content.filter((b): b is Anthropic.ToolUseBlock => b.type === 'tool_use');
const textBlocks = finalMsg.content.filter((b): b is Anthropic.TextBlock => b.type === 'text');
const finalText = textBlocks.map((b) => b.text).join('');
// No tool calls — save and done
if (toolUses.length === 0) {
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)'
).run(assistantMsgId, convId, 'assistant', finalText);
console.log(`[chat] Done, response: ${finalText.slice(0, 80)}`);
return;
}
// Save assistant message with filtered content (no thinking blocks)
const cleanContent = filterContent(finalMsg.content as ContentBlock[]);
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content, tool_calls) VALUES (?, ?, ?, ?, ?)'
).run(assistantMsgId, convId, 'assistant', finalText || '(调用工具)', JSON.stringify(cleanContent));
currentMessages.push({ role: 'assistant', content: cleanContent });
// Execute tools
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const tool of toolUses) {
ws.send(JSON.stringify({
type: 'tool_start',
data: { tool: tool.name, input: tool.input },
}));
console.log(`[chat] Executing tool: ${tool.name}`);
try {
const result = await videoAgent.executeTool(tool.name, tool.input as Record<string, unknown>);
toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: result });
const toolMsgId = randomUUID();
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)'
).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: result }));
ws.send(JSON.stringify({
type: 'tool_result',
data: { tool: tool.name, result: result.slice(0, 1000) },
}));
} catch (err) {
const errMsg = (err as Error).message;
toolResults.push({ type: 'tool_result', tool_use_id: tool.id, content: `Error: ${errMsg}` });
const toolMsgId = randomUUID();
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)'
).run(toolMsgId, convId, 'tool', JSON.stringify({ tool_use_id: tool.id, content: `Error: ${errMsg}` }));
ws.send(JSON.stringify({
type: 'tool_error',
data: { tool: tool.name, error: errMsg },
}));
}
}
currentMessages.push({ role: 'user', content: toolResults });
}
} catch (err) {
const errMsg = (err as Error).message;
console.error('[chat] LLM error:', errMsg);
const errId = randomUUID();
getDb().prepare(
'INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, ?, ?)'
).run(errId, convId, 'assistant', `抱歉,出错了:${errMsg}`);
ws.send(JSON.stringify({
type: 'message',
data: { id: errId, role: 'assistant', content: `抱歉,出错了:${errMsg}` },
}));
}
}