/** * Phase: videos — 视频生成(VEO / Grok / Kling) * * 图生视频,批量提交,生成后自动上传 OSS * 支持 task ID 恢复:中断后重跑时优先恢复已有任务 */ const fs = require('fs') const path = require('path') const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils') async function phaseVideos(manifest, manifestPath, options) { const dir = getManifestDir(manifestPath) const videosDir = path.join(dir, 'videos') ensureDir(videosDir) const accountConfig = options.accountConfig || {} const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames' const items = manifest.items.filter(it => it.status === 'done' && it.confirmed !== false && it.url && it.videoPrompt && !it.video ) if (items.length === 0) { log('videos', '无待处理 item,跳过'); return } // 选择生成器 let Api, pollFn const modelLower = videoModel.toLowerCase() if (modelLower.includes('grok')) { const gen = require('../grok-video-generator') Api = gen.GrokApi; pollFn = gen.pollWithRetry } else if (modelLower.includes('kling')) { const gen = require('../kling-video-generator') Api = gen.KlingApi; pollFn = gen.pollWithRetry } else { const gen = require('../veo-video-generator') Api = gen.VeoApi; pollFn = gen.pollWithRetry } const ratio = manifest.format || '9:16' log('videos', `共 ${items.length} 个, 模型: ${videoModel}`) // Phase 1: 恢复已有任务(有 videoTaskId 的 item) const recovered = [] const needSubmit = [] for (const item of items) { if (item.videoTaskId) { recovered.push(item) } else { needSubmit.push(item) } } // 轮询恢复的任务 if (recovered.length > 0) { log('videos', `尝试恢复 ${recovered.length} 个中断任务...`) await Promise.allSettled( recovered.map(async (item) => { try { log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`) const result = await pollFn(item.videoTaskId, item.videoPrompt, { outputDir: videosDir, aspectRatio: ratio, imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '', }) if (result.file) { item.video = path.relative(dir, result.file).replace(/\\/g, '/') item.videoDuration = result.duration delete item.videoTaskId log('videos', ` item ${item.id} 恢复成功`) } } catch (err) { log('videos', ` item ${item.id} 恢复失败: ${err.message},将重新提交`) delete item.videoTaskId needSubmit.push(item) } saveManifest(manifestPath, manifest) }) ) } if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return } // Phase 2: 提交新任务(并发 3) const concurrency = 3 log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency})...`) const submitted = [] for (let i = 0; i < needSubmit.length; i += concurrency) { const batch = needSubmit.slice(i, i + concurrency) const batchResults = await Promise.allSettled( batch.map(async (item) => { const images = item.lastFrameUrl ? [item.url, item.lastFrameUrl] : [item.url] const extraOpts = item.lastFrameUrl ? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl } : { aspectRatio: ratio } try { const taskId = await Api.create(item.url, item.videoPrompt, extraOpts) return { item, taskId, error: null } } catch (err) { return { item, taskId: null, error: err.message } } }) ) for (const r of batchResults) { const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason } submitted.push(val) if (val.item && val.taskId) { val.item.videoTaskId = val.taskId } } saveManifest(manifestPath, manifest) } // Phase 3: 轮询新任务 const pending = submitted.filter(s => s.taskId) if (pending.length === 0) { log('videos', '所有任务提交失败') for (const s of submitted) { if (s.item) { s.item.status = 'failed'; s.item.error = s.error || '提交失败' } } saveManifest(manifestPath, manifest) return } log('videos', `等待 ${pending.length} 个视频生成...`) const pollResults = await Promise.allSettled( pending.map(async ({ item, taskId }) => { try { const result = await pollFn(taskId, item.videoPrompt, { outputDir: videosDir, aspectRatio: ratio, imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '', }) return { item, result, ok: true } } catch (err) { return { item, error: err.message, ok: false } } }) ) for (const r of pollResults) { const val = r.status === 'fulfilled' ? r.value : { ok: false, error: r.reason?.message } if (val.ok && val.result.file) { val.item.video = path.relative(dir, val.result.file).replace(/\\/g, '/') val.item.videoDuration = val.result.duration delete val.item.videoTaskId } else if (val.item) { val.item.status = 'failed' val.item.error = val.error || '视频生成未返回文件' delete val.item.videoTaskId } saveManifest(manifestPath, manifest) } // 上传视频到 OSS const { uploadFile } = require('../oss-upload') const videoItems = manifest.items.filter(it => it.video && !it.videoUrl) if (videoItems.length > 0) { log('videos', `上传 ${videoItems.length} 个视频到 OSS...`) for (const item of videoItems) { const videoPath = path.resolve(dir, item.video) try { const { url } = await uploadFile(videoPath) item.videoUrl = url log('videos', ` ${item.video} → OK`) } catch (err) { log('videos', ` ${item.video} 上传失败: ${err.message}`) } saveManifest(manifestPath, manifest) } } saveManifest(manifestPath, manifest) } module.exports = { phaseVideos }