/** * Phase: videos — 视频生成(VEO / Grok / Kling) * * 图生视频,批量提交,生成后自动上传 OSS * 支持 task ID 恢复:中断后重跑时优先恢复已有任务 */ const path = require('path') const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils') function isQuotaError(msg) { if (!msg) return false const s = msg.toLowerCase() return /quota|limit|exceed|insufficient|余额|额度|超限|rate.?limit|too.?many/.test(s) } function applyPollResult(item, val, dir) { if (val.ok && val.result?.file) { item.video = path.relative(dir, val.result.file).replace(/\\/g, '/') item.videoDuration = val.result.duration item.status = 'done' delete item.videoTaskId } else if (val.item) { if (val.isTaskFailure) { item.status = 'failed' item.error = val.error || '视频生成未返回文件' delete item.videoTaskId } else { log('videos', ` item ${item.id} 生成超时(保留 taskId 待恢复): ${val.error}`) item.status = 'pending' } } } async function phaseVideos(manifest, manifestPath, options) { const dir = getManifestDir(manifestPath) const videosDir = path.join(dir, 'videos') ensureDir(videosDir) const accountConfig = options.accountConfig || {} const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames' const videoCandidates = manifest.items.filter(it => { if (it.confirmed === false) return false if (!it.url || !it.videoPrompt) return false // 已有视频(本地文件或远程 URL)且状态为 done → 跳过,避免重复生成 if (it.status === 'done' && (it.video || it.videoUrl)) return false return ['done', 'pending', 'failed'].includes(it.status) }) if (videoCandidates.length === 0) { console.log("\n⚠️ [videos] 没有符合条件的 item 进入视频生成阶段") console.log(" manifest 中共有", manifest.items.length, "个 item,逐一诊断:") for (const it of manifest.items) { const reasons = [] if (it.confirmed === false) reasons.push("confirmed=false") if (!it.url) reasons.push("缺少 url(图片未上传)") if (!it.videoPrompt) reasons.push("缺少 videoPrompt") if (it.status === 'done' && (it.video || it.videoUrl)) { reasons.push("视频已生成,已跳过") } else if (!["done","pending","failed"].includes(it.status)) { reasons.push("status=" + (it.status || "undefined") + "(不在 done/pending/failed 中)") } console.log(" - item", it.id || manifest.items.indexOf(it), ":", reasons.length > 0 ? reasons.join(", ") : "已满足全部条件(不应在此)") } console.log("\n 修复命令:") console.log(" node .claude/skills/video-from-script/scripts/pipeline.js confirm --manifest", manifestPath, "--all") console.log() } const items = [] for (const it of videoCandidates) { // 磁盘兜底:本地视频文件已存在则恢复引用并跳过 if (!it.video && it.id) { const fs = require('fs') const existingVideos = fs.readdirSync(videosDir).filter(f => f.includes('_item' + it.id + '_') || f.includes('_item' + it.id + '.') ) if (existingVideos.length > 0) { it.video = 'videos/' + existingVideos[existingVideos.length - 1] it.status = 'done' delete it.videoTaskId log('videos', ` item ${it.id} 发现已有视频文件 ${it.video},跳过生成`) continue } } if (it.video || it.videoUrl) { if (it.status === 'done') continue delete it.video delete it.videoUrl delete it.videoDuration } items.push(it) } if (items.length === 0) { log('videos', '无待处理 item,跳过'); return } let Api, pollFn const modelLower = videoModel.toLowerCase() if (modelLower.includes('grok')) { const gen = require('../grok-video-generator') Api = gen.GrokApi; pollFn = gen.pollWithRetry } else if (modelLower.includes('kling')) { const gen = require('../kling-video-generator') Api = gen.KlingApi; pollFn = gen.pollWithRetry } else { const gen = require('../veo-video-generator') Api = gen.VeoApi; pollFn = gen.pollWithRetry } const ratio = manifest.format || '9:16' const pollOpts = (item) => ({ outputDir: videosDir, aspectRatio: ratio, imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '', }) log('videos', `共 ${items.length} 个, 模型: ${videoModel}`) // Phase 1: 恢复已有任务(有 videoTaskId 的 item) const recovered = [] const needSubmit = [] for (const item of items) { if (item.videoTaskId) { recovered.push(item) } else { needSubmit.push(item) } } if (recovered.length > 0) { log('videos', `尝试恢复 ${recovered.length} 个中断任务...`) await Promise.allSettled( recovered.map(async (item) => { try { log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`) const result = await pollFn(item.videoTaskId, item.videoPrompt, pollOpts(item)) if (result.file) { item.video = path.relative(dir, result.file).replace(/\\/g, '/') item.videoDuration = result.duration item.status = 'done' delete item.videoTaskId log('videos', ` item ${item.id} 恢复成功`) } } catch (err) { if (err.isTaskFailure === true) { log('videos', ` item ${item.id} 恢复失败(任务失败): ${err.message},将重新提交`) delete item.videoTaskId needSubmit.push(item) } else { log('videos', ` item ${item.id} 恢复超时(保留 taskId 下次重试): ${err.message}`) item.status = 'pending' } } }) ) saveManifest(manifestPath, manifest) } if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return } // Phase 2+3: 分批提交+轮询(严格并发 ≤ 5,等一批完成再提交下一批) const concurrency = 5 log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency})...`) for (let i = 0; i < needSubmit.length; i += concurrency) { const batch = needSubmit.slice(i, i + concurrency).filter(item => !item.videoTaskId) if (batch.length === 0) continue // 提交本批 const submitResults = await Promise.allSettled( batch.map(async (item) => { const extraOpts = item.lastFrameUrl ? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' } : { aspectRatio: ratio } try { const taskId = await Api.create(item.url, item.videoPrompt, extraOpts) return { item, taskId, error: null } } catch (err) { return { item, taskId: null, error: err.message } } }) ) const submitted = [] let hitQuota = false for (const r of submitResults) { const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason } submitted.push(val) if (val.item && val.taskId) { val.item.videoTaskId = val.taskId } else if (val.item && !val.taskId) { val.item.status = 'failed' val.item.error = val.error || '提交失败' if (isQuotaError(val.error)) hitQuota = true } } saveManifest(manifestPath, manifest) if (hitQuota) { log('videos', ` ⚠️ 额度不足,停止提交(跳过剩余 ${needSubmit.length - i - batch.length} 个任务)`) break } // 轮询本批 const pending = submitted.filter(s => s.taskId) if (pending.length === 0) continue const end = Math.min(i + concurrency, needSubmit.length) log('videos', ` [${i + 1}-${end}/${needSubmit.length}] 等待 ${pending.length} 个视频生成...`) const pollResults = await Promise.allSettled( pending.map(async ({ item, taskId }) => { try { const result = await pollFn(taskId, item.videoPrompt, pollOpts(item)) return { item, result, ok: true } } catch (err) { return { item, error: err.message, ok: false, isTaskFailure: err.isTaskFailure === true } } }) ) for (const r of pollResults) { const val = r.status === 'fulfilled' ? r.value : { ok: false, error: r.reason?.message || String(r.reason), isTaskFailure: r.reason?.isTaskFailure === true } applyPollResult(val.item || {}, val, dir) } saveManifest(manifestPath, manifest) } // 上传视频到 OSS const { uploadFile } = require('../oss-upload') const videoItems = manifest.items.filter(it => it.video && !it.videoUrl) if (videoItems.length > 0) { log('videos', `上传 ${videoItems.length} 个视频到 OSS...`) for (const item of videoItems) { const videoPath = path.resolve(dir, item.video) try { const { url } = await uploadFile(videoPath) item.videoUrl = url log('videos', ` ${item.video} → OK`) } catch (err) { log('videos', ` ${item.video} 上传失败: ${err.message}`) } } saveManifest(manifestPath, manifest) } } module.exports = { phaseVideos }