Files
video-create/.claude/skills/video-from-script/scripts/lib/phase-videos.js

251 lines
9.0 KiB
JavaScript
Raw Normal View History

/**
* Phase: videos 视频生成VEO / Grok / Kling
*
* 图生视频批量提交生成后自动上传 OSS
* 支持 task ID 恢复中断后重跑时优先恢复已有任务
*/
const path = require('path')
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
function isQuotaError(msg) {
if (!msg) return false
const s = msg.toLowerCase()
return /quota|limit|exceed|insufficient|余额|额度|超限|rate.?limit|too.?many/.test(s)
}
function applyPollResult(item, val, dir) {
if (val.ok && val.result?.file) {
item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
item.videoDuration = val.result.duration
item.status = 'done'
delete item.videoTaskId
} else if (val.item) {
if (val.isTaskFailure) {
item.status = 'failed'
item.error = val.error || '视频生成未返回文件'
delete item.videoTaskId
} else {
log('videos', ` item ${item.id} 生成超时(保留 taskId 待恢复): ${val.error}`)
item.status = 'pending'
}
}
}
async function phaseVideos(manifest, manifestPath, options) {
const dir = getManifestDir(manifestPath)
const videosDir = path.join(dir, 'videos')
ensureDir(videosDir)
const accountConfig = options.accountConfig || {}
const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames'
const videoCandidates = manifest.items.filter(it => {
if (it.confirmed === false) return false
if (!it.url || !it.videoPrompt) return false
// 已有视频(本地文件或远程 URL且状态为 done → 跳过,避免重复生成
if (it.status === 'done' && (it.video || it.videoUrl)) return false
return ['done', 'pending', 'failed'].includes(it.status)
})
if (videoCandidates.length === 0) {
console.log("\n⚠ [videos] 没有符合条件的 item 进入视频生成阶段")
console.log(" manifest 中共有", manifest.items.length, "个 item逐一诊断:")
for (const it of manifest.items) {
const reasons = []
if (it.confirmed === false) reasons.push("confirmed=false")
if (!it.url) reasons.push("缺少 url图片未上传")
if (!it.videoPrompt) reasons.push("缺少 videoPrompt")
if (it.status === 'done' && (it.video || it.videoUrl)) {
reasons.push("视频已生成,已跳过")
} else if (!["done","pending","failed"].includes(it.status)) {
reasons.push("status=" + (it.status || "undefined") + "(不在 done/pending/failed 中)")
}
console.log(" - item", it.id || manifest.items.indexOf(it), ":", reasons.length > 0 ? reasons.join(", ") : "已满足全部条件(不应在此)")
}
console.log("\n 修复命令:")
console.log(" node .claude/skills/video-from-script/scripts/pipeline.js confirm --manifest", manifestPath, "--all")
console.log()
}
const items = []
for (const it of videoCandidates) {
// 磁盘兜底:本地视频文件已存在则恢复引用并跳过
if (!it.video && it.id) {
const fs = require('fs')
const existingVideos = fs.readdirSync(videosDir).filter(f =>
f.includes('_item' + it.id + '_') || f.includes('_item' + it.id + '.')
)
if (existingVideos.length > 0) {
it.video = 'videos/' + existingVideos[existingVideos.length - 1]
it.status = 'done'
delete it.videoTaskId
log('videos', ` item ${it.id} 发现已有视频文件 ${it.video},跳过生成`)
continue
}
}
if (it.video || it.videoUrl) {
if (it.status === 'done') continue
delete it.video
delete it.videoUrl
delete it.videoDuration
}
items.push(it)
}
if (items.length === 0) { log('videos', '无待处理 item跳过'); return }
let Api, pollFn
const modelLower = videoModel.toLowerCase()
if (modelLower.includes('grok')) {
const gen = require('../grok-video-generator')
Api = gen.GrokApi; pollFn = gen.pollWithRetry
} else if (modelLower.includes('kling')) {
const gen = require('../kling-video-generator')
Api = gen.KlingApi; pollFn = gen.pollWithRetry
} else {
const gen = require('../veo-video-generator')
Api = gen.VeoApi; pollFn = gen.pollWithRetry
}
const ratio = manifest.format || '9:16'
const pollOpts = (item) => ({
outputDir: videosDir, aspectRatio: ratio,
imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '',
})
log('videos', `${items.length} 个, 模型: ${videoModel}`)
// Phase 1: 恢复已有任务(有 videoTaskId 的 item
const recovered = []
const needSubmit = []
for (const item of items) {
if (item.videoTaskId) {
recovered.push(item)
} else {
needSubmit.push(item)
}
}
if (recovered.length > 0) {
log('videos', `尝试恢复 ${recovered.length} 个中断任务...`)
await Promise.allSettled(
recovered.map(async (item) => {
try {
log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`)
const result = await pollFn(item.videoTaskId, item.videoPrompt, pollOpts(item))
if (result.file) {
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
item.videoDuration = result.duration
item.status = 'done'
delete item.videoTaskId
log('videos', ` item ${item.id} 恢复成功`)
}
} catch (err) {
if (err.isTaskFailure === true) {
log('videos', ` item ${item.id} 恢复失败(任务失败): ${err.message},将重新提交`)
delete item.videoTaskId
needSubmit.push(item)
} else {
log('videos', ` item ${item.id} 恢复超时(保留 taskId 下次重试): ${err.message}`)
item.status = 'pending'
}
}
})
)
saveManifest(manifestPath, manifest)
}
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
// Phase 2+3: 分批提交+轮询(严格并发 ≤ 5等一批完成再提交下一批
const concurrency = 5
log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency}...`)
for (let i = 0; i < needSubmit.length; i += concurrency) {
const batch = needSubmit.slice(i, i + concurrency).filter(item => !item.videoTaskId)
if (batch.length === 0) continue
// 提交本批
const submitResults = await Promise.allSettled(
batch.map(async (item) => {
const extraOpts = item.lastFrameUrl
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
: { aspectRatio: ratio }
try {
const taskId = await Api.create(item.url, item.videoPrompt, extraOpts)
return { item, taskId, error: null }
} catch (err) {
return { item, taskId: null, error: err.message }
}
})
)
const submitted = []
let hitQuota = false
for (const r of submitResults) {
const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason }
submitted.push(val)
if (val.item && val.taskId) {
val.item.videoTaskId = val.taskId
} else if (val.item && !val.taskId) {
val.item.status = 'failed'
val.item.error = val.error || '提交失败'
if (isQuotaError(val.error)) hitQuota = true
}
}
saveManifest(manifestPath, manifest)
if (hitQuota) {
log('videos', ` ⚠️ 额度不足,停止提交(跳过剩余 ${needSubmit.length - i - batch.length} 个任务)`)
break
}
// 轮询本批
const pending = submitted.filter(s => s.taskId)
if (pending.length === 0) continue
const end = Math.min(i + concurrency, needSubmit.length)
log('videos', ` [${i + 1}-${end}/${needSubmit.length}] 等待 ${pending.length} 个视频生成...`)
const pollResults = await Promise.allSettled(
pending.map(async ({ item, taskId }) => {
try {
const result = await pollFn(taskId, item.videoPrompt, pollOpts(item))
return { item, result, ok: true }
} catch (err) {
return { item, error: err.message, ok: false, isTaskFailure: err.isTaskFailure === true }
}
})
)
for (const r of pollResults) {
const val = r.status === 'fulfilled'
? r.value
: { ok: false, error: r.reason?.message || String(r.reason), isTaskFailure: r.reason?.isTaskFailure === true }
applyPollResult(val.item || {}, val, dir)
}
saveManifest(manifestPath, manifest)
}
// 上传视频到 OSS
const { uploadFile } = require('../oss-upload')
const videoItems = manifest.items.filter(it => it.video && !it.videoUrl)
if (videoItems.length > 0) {
log('videos', `上传 ${videoItems.length} 个视频到 OSS...`)
for (const item of videoItems) {
const videoPath = path.resolve(dir, item.video)
try {
const { url } = await uploadFile(videoPath)
item.videoUrl = url
log('videos', ` ${item.video} → OK`)
} catch (err) {
log('videos', ` ${item.video} 上传失败: ${err.message}`)
}
}
saveManifest(manifestPath, manifest)
}
}
module.exports = { phaseVideos }