import { Agent } from '@earendil-works/pi-agent-core'; import type { AgentEvent } from '@earendil-works/pi-agent-core'; import { streamSimple } from '@earendil-works/pi-ai'; import type { AssistantMessage } from '@earendil-works/pi-ai'; import { WebSocket } from 'ws'; import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; import { createPiModel } from './pi-model'; import { createPiTools } from './pi-tools'; import { tools } from './tools/index'; import { videoAgent } from './index'; import { dbToPiMessages, saveUserMessage, saveAssistantMessage, saveToolResult, type DbMessage } from './pi-persist'; import { getDb } from '../db'; import { manifestRelToUrl } from './tools/shared'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const SKILLS_DIR = path.join(__dirname, 'skills'); function loadSkills(): string { const parts: string[] = []; if (!fs.existsSync(SKILLS_DIR)) return ''; const skillDirs = fs.readdirSync(SKILLS_DIR, { withFileTypes: true }) .filter((d) => d.isDirectory()); for (const dir of skillDirs) { const skillFile = path.join(SKILLS_DIR, dir.name, 'SKILL.md'); if (fs.existsSync(skillFile)) { let content = fs.readFileSync(skillFile, 'utf-8'); // Strip YAML frontmatter content = content.replace(/^---[\s\S]*?---\n*/, ''); parts.push(content.trim()); } } return parts.join('\n\n---\n\n'); } const cachedSkillContent = loadSkills(); interface RunContext { currentAssistantMsgId: string | null; } export async function runAgentChat(ws: WebSocket, convId: string, userContent: string) { const userMsgId = saveUserMessage(convId, userContent); ws.send(JSON.stringify({ type: 'message', data: { id: userMsgId, role: 'user', content: userContent } })); const msgCount = getDb().prepare('SELECT COUNT(*) as count FROM messages WHERE conversation_id = ?').get(convId) as { count: number }; if (msgCount.count <= 1) { const title = userContent.slice(0, 30) + (userContent.length > 30 ? '...' : ''); getDb().prepare("UPDATE conversations SET title = ?, updated_at = datetime('now') WHERE id = ?").run(title, convId); } getDb().prepare("UPDATE conversations SET updated_at = datetime('now') WHERE id = ?").run(convId); const history = getDb().prepare( 'SELECT * FROM messages WHERE conversation_id = ? AND id != ? ORDER BY created_at' ).all(convId, userMsgId) as DbMessage[]; const piMessages = dbToPiMessages(history); const { model, apiKey } = createPiModel(); const piTools = createPiTools(tools); const agent = new Agent({ initialState: { systemPrompt: videoAgent.getSystemPrompt() + (cachedSkillContent ? '\n\n' + cachedSkillContent : ''), model, thinkingLevel: 'off', tools: piTools, messages: piMessages, }, streamFn: streamSimple, getApiKey: () => apiKey, }); const ctx: RunContext = { currentAssistantMsgId: null }; agent.subscribe((event: AgentEvent) => { handleAgentEvent(ws, convId, event, ctx); }); ws.send(JSON.stringify({ type: 'status', data: { status: 'thinking' } })); try { await agent.prompt(userContent); } catch (err) { const errMsg = (err as Error).message; console.error('[pi-bridge] Agent error:', errMsg); ws.send(JSON.stringify({ type: 'message', data: { id: '', role: 'assistant', content: `抱歉,出错了:${errMsg}` }, })); } } function handleAgentEvent(ws: WebSocket, convId: string, event: AgentEvent, ctx: RunContext) { switch (event.type) { case 'message_start': { if (event.message.role === 'assistant') { const id = crypto.randomUUID(); ctx.currentAssistantMsgId = id; ws.send(JSON.stringify({ type: 'message_start', data: { id } })); } break; } case 'message_update': { const piEvent = event.assistantMessageEvent; const id = ctx.currentAssistantMsgId || ''; if (piEvent.type === 'text_delta') { ws.send(JSON.stringify({ type: 'text_delta', data: { id, text: piEvent.delta } })); } else if (piEvent.type === 'thinking_delta') { ws.send(JSON.stringify({ type: 'reasoning_delta', data: { id, text: piEvent.delta } })); } break; } case 'message_end': { if (event.message.role === 'assistant') { const id = ctx.currentAssistantMsgId || ''; ws.send(JSON.stringify({ type: 'message_end', data: { id } })); saveAssistantMessage(convId, event.message as AssistantMessage); ctx.currentAssistantMsgId = null; } break; } case 'tool_execution_start': { ws.send(JSON.stringify({ type: 'tool_start', data: { tool: event.toolName, input: event.args } })); break; } case 'tool_execution_end': { const resultText = event.result?.content?.map((c: any) => c.text || '').join('') || ''; const assets = extractAssets(event.toolName, resultText); if (event.isError) { ws.send(JSON.stringify({ type: 'tool_error', data: { tool: event.toolName, error: resultText } })); } else { ws.send(JSON.stringify({ type: 'tool_result', data: { tool: event.toolName, result: resultText.slice(0, 1000), assets } })); } saveToolResult(convId, event.toolCallId, event.toolName, resultText, event.isError); break; } } } function extractAssets(toolName: string, resultText: string): Array<{ type: 'image' | 'video'; url: string; name: string }> { const assets: Array<{ type: 'image' | 'video'; url: string; name: string }> = []; try { const data = JSON.parse(resultText); if (toolName === 'generate_images' && data.manifestPath && Array.isArray(data.images)) { for (const img of data.images) { if (img.file) { assets.push({ type: 'image', url: manifestRelToUrl(data.manifestPath, img.file), name: img.file.split('/').pop() || '' }); } if (Array.isArray(img.candidates)) { for (const c of img.candidates) { assets.push({ type: 'image', url: manifestRelToUrl(data.manifestPath, c), name: c.split('/').pop() || '' }); } } } } if (toolName === 'generate_videos' && data.manifestPath && Array.isArray(data.videos)) { for (const vid of data.videos) { if (vid.videoUrl) { assets.push({ type: 'video', url: vid.videoUrl, name: vid.video?.split('/').pop() || 'video' }); } else if (vid.video) { assets.push({ type: 'video', url: manifestRelToUrl(data.manifestPath, vid.video), name: vid.video.split('/').pop() || '' }); } } } } catch {} return assets; }