Files
video-create/.claude/skills/video-from-script/scripts/lib/phase-videos.js
sion123 65af6c92fc feat(video-from-script): 视频生成阶段增加重复任务检测与原子状态标记
在视频生成流水线中添加两项关键改进:

1. 在 `cmdNext` 中提前将条目状态标记为 `processing` 并持久化,防止同一行被多个并行进程重复取出处理
2. 在 `phaseVideos` 中增加磁盘兜底检测:对无 `video` 字段的条目,根据 `id` 扫描本地视频目录,若发现已有视频文件则恢复引用并跳过生成
3. 优化状态过滤逻辑:`done` 状态且已有视频文件的条目明确跳过并输出原因,减少冗余日志
2026-05-17 16:50:26 +08:00

251 lines
9.0 KiB
JavaScript
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Phase: videos — 视频生成VEO / Grok / Kling
*
* 图生视频,批量提交,生成后自动上传 OSS
* 支持 task ID 恢复:中断后重跑时优先恢复已有任务
*/
const path = require('path')
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
function isQuotaError(msg) {
if (!msg) return false
const s = msg.toLowerCase()
return /quota|limit|exceed|insufficient|余额|额度|超限|rate.?limit|too.?many/.test(s)
}
function applyPollResult(item, val, dir) {
if (val.ok && val.result?.file) {
item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
item.videoDuration = val.result.duration
item.status = 'done'
delete item.videoTaskId
} else if (val.item) {
if (val.isTaskFailure) {
item.status = 'failed'
item.error = val.error || '视频生成未返回文件'
delete item.videoTaskId
} else {
log('videos', ` item ${item.id} 生成超时(保留 taskId 待恢复): ${val.error}`)
item.status = 'pending'
}
}
}
async function phaseVideos(manifest, manifestPath, options) {
const dir = getManifestDir(manifestPath)
const videosDir = path.join(dir, 'videos')
ensureDir(videosDir)
const accountConfig = options.accountConfig || {}
const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames'
const videoCandidates = manifest.items.filter(it => {
if (it.confirmed === false) return false
if (!it.url || !it.videoPrompt) return false
// 已有视频(本地文件或远程 URL且状态为 done → 跳过,避免重复生成
if (it.status === 'done' && (it.video || it.videoUrl)) return false
return ['done', 'pending', 'failed'].includes(it.status)
})
if (videoCandidates.length === 0) {
console.log("\n⚠ [videos] 没有符合条件的 item 进入视频生成阶段")
console.log(" manifest 中共有", manifest.items.length, "个 item逐一诊断:")
for (const it of manifest.items) {
const reasons = []
if (it.confirmed === false) reasons.push("confirmed=false")
if (!it.url) reasons.push("缺少 url图片未上传")
if (!it.videoPrompt) reasons.push("缺少 videoPrompt")
if (it.status === 'done' && (it.video || it.videoUrl)) {
reasons.push("视频已生成,已跳过")
} else if (!["done","pending","failed"].includes(it.status)) {
reasons.push("status=" + (it.status || "undefined") + "(不在 done/pending/failed 中)")
}
console.log(" - item", it.id || manifest.items.indexOf(it), ":", reasons.length > 0 ? reasons.join(", ") : "已满足全部条件(不应在此)")
}
console.log("\n 修复命令:")
console.log(" node .claude/skills/video-from-script/scripts/pipeline.js confirm --manifest", manifestPath, "--all")
console.log()
}
const items = []
for (const it of videoCandidates) {
// 磁盘兜底:本地视频文件已存在则恢复引用并跳过
if (!it.video && it.id) {
const fs = require('fs')
const existingVideos = fs.readdirSync(videosDir).filter(f =>
f.includes('_item' + it.id + '_') || f.includes('_item' + it.id + '.')
)
if (existingVideos.length > 0) {
it.video = 'videos/' + existingVideos[existingVideos.length - 1]
it.status = 'done'
delete it.videoTaskId
log('videos', ` item ${it.id} 发现已有视频文件 ${it.video},跳过生成`)
continue
}
}
if (it.video || it.videoUrl) {
if (it.status === 'done') continue
delete it.video
delete it.videoUrl
delete it.videoDuration
}
items.push(it)
}
if (items.length === 0) { log('videos', '无待处理 item跳过'); return }
let Api, pollFn
const modelLower = videoModel.toLowerCase()
if (modelLower.includes('grok')) {
const gen = require('../grok-video-generator')
Api = gen.GrokApi; pollFn = gen.pollWithRetry
} else if (modelLower.includes('kling')) {
const gen = require('../kling-video-generator')
Api = gen.KlingApi; pollFn = gen.pollWithRetry
} else {
const gen = require('../veo-video-generator')
Api = gen.VeoApi; pollFn = gen.pollWithRetry
}
const ratio = manifest.format || '9:16'
const pollOpts = (item) => ({
outputDir: videosDir, aspectRatio: ratio,
imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '',
})
log('videos', `${items.length} 个, 模型: ${videoModel}`)
// Phase 1: 恢复已有任务(有 videoTaskId 的 item
const recovered = []
const needSubmit = []
for (const item of items) {
if (item.videoTaskId) {
recovered.push(item)
} else {
needSubmit.push(item)
}
}
if (recovered.length > 0) {
log('videos', `尝试恢复 ${recovered.length} 个中断任务...`)
await Promise.allSettled(
recovered.map(async (item) => {
try {
log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`)
const result = await pollFn(item.videoTaskId, item.videoPrompt, pollOpts(item))
if (result.file) {
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
item.videoDuration = result.duration
item.status = 'done'
delete item.videoTaskId
log('videos', ` item ${item.id} 恢复成功`)
}
} catch (err) {
if (err.isTaskFailure === true) {
log('videos', ` item ${item.id} 恢复失败(任务失败): ${err.message},将重新提交`)
delete item.videoTaskId
needSubmit.push(item)
} else {
log('videos', ` item ${item.id} 恢复超时(保留 taskId 下次重试): ${err.message}`)
item.status = 'pending'
}
}
})
)
saveManifest(manifestPath, manifest)
}
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
// Phase 2+3: 分批提交+轮询(严格并发 ≤ 5等一批完成再提交下一批
const concurrency = 5
log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency}...`)
for (let i = 0; i < needSubmit.length; i += concurrency) {
const batch = needSubmit.slice(i, i + concurrency).filter(item => !item.videoTaskId)
if (batch.length === 0) continue
// 提交本批
const submitResults = await Promise.allSettled(
batch.map(async (item) => {
const extraOpts = item.lastFrameUrl
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
: { aspectRatio: ratio }
try {
const taskId = await Api.create(item.url, item.videoPrompt, extraOpts)
return { item, taskId, error: null }
} catch (err) {
return { item, taskId: null, error: err.message }
}
})
)
const submitted = []
let hitQuota = false
for (const r of submitResults) {
const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason }
submitted.push(val)
if (val.item && val.taskId) {
val.item.videoTaskId = val.taskId
} else if (val.item && !val.taskId) {
val.item.status = 'failed'
val.item.error = val.error || '提交失败'
if (isQuotaError(val.error)) hitQuota = true
}
}
saveManifest(manifestPath, manifest)
if (hitQuota) {
log('videos', ` ⚠️ 额度不足,停止提交(跳过剩余 ${needSubmit.length - i - batch.length} 个任务)`)
break
}
// 轮询本批
const pending = submitted.filter(s => s.taskId)
if (pending.length === 0) continue
const end = Math.min(i + concurrency, needSubmit.length)
log('videos', ` [${i + 1}-${end}/${needSubmit.length}] 等待 ${pending.length} 个视频生成...`)
const pollResults = await Promise.allSettled(
pending.map(async ({ item, taskId }) => {
try {
const result = await pollFn(taskId, item.videoPrompt, pollOpts(item))
return { item, result, ok: true }
} catch (err) {
return { item, error: err.message, ok: false, isTaskFailure: err.isTaskFailure === true }
}
})
)
for (const r of pollResults) {
const val = r.status === 'fulfilled'
? r.value
: { ok: false, error: r.reason?.message || String(r.reason), isTaskFailure: r.reason?.isTaskFailure === true }
applyPollResult(val.item || {}, val, dir)
}
saveManifest(manifestPath, manifest)
}
// 上传视频到 OSS
const { uploadFile } = require('../oss-upload')
const videoItems = manifest.items.filter(it => it.video && !it.videoUrl)
if (videoItems.length > 0) {
log('videos', `上传 ${videoItems.length} 个视频到 OSS...`)
for (const item of videoItems) {
const videoPath = path.resolve(dir, item.video)
try {
const { url } = await uploadFile(videoPath)
item.videoUrl = url
log('videos', ` ${item.video} → OK`)
} catch (err) {
log('videos', ` ${item.video} 上传失败: ${err.message}`)
}
}
saveManifest(manifestPath, manifest)
}
}
module.exports = { phaseVideos }