feat(video-from-script): 严格并行提交批次限制并增强额度错误检测
- 引入 `isQuotaError` 函数,统一检测 API 额度不足错误 - Kling 生成器改为严格分批提交并等待每批轮询完成,限制并发 ≤ 5 - phase-videos 重构为分批提交+轮询模式,支持额度不足时提前终止 - 提取 `applyPollResult` 和 `pollOpts` 工具函数,减少代码重复 - 新增批量提交的进度日志,显示当前批次和总数
This commit is contained in:
@@ -388,8 +388,14 @@ async function generate(imageUrl, prompt, options = {}) {
|
|||||||
// 批量并行生成(支持 manifest.json)
|
// 批量并行生成(支持 manifest.json)
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
|
function isQuotaError(msg) {
|
||||||
|
if (!msg) return false
|
||||||
|
const s = msg.toLowerCase()
|
||||||
|
return /quota|limit|exceed|insufficient|余额|额度|超限|rate.?limit|too.?many/.test(s)
|
||||||
|
}
|
||||||
|
|
||||||
async function batchGenerate(tasks, options = {}) {
|
async function batchGenerate(tasks, options = {}) {
|
||||||
const { outputDir = './output', concurrency = 2, duration = 5, mode = 'std' } = options
|
const { outputDir = './output', concurrency = 5, duration = 5, mode = 'std' } = options
|
||||||
|
|
||||||
fs.mkdirSync(outputDir, { recursive: true })
|
fs.mkdirSync(outputDir, { recursive: true })
|
||||||
|
|
||||||
@@ -406,14 +412,19 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 1: 并行提交
|
// 严格并发 ≤ 5:每批提交后等轮询完成再提交下一批
|
||||||
|
const batchSize = Math.min(concurrency, 5)
|
||||||
const modeLabel = tasks.some(t => t.lastFrameUrl) ? '首尾帧' : '单图'
|
const modeLabel = tasks.some(t => t.lastFrameUrl) ? '首尾帧' : '单图'
|
||||||
console.log(`\n📡 并行提交 ${tasks.length} 个可灵视频任务(并发: ${concurrency},模式: ${modeLabel})...`)
|
console.log(`\n📡 并行提交 ${tasks.length} 个可灵视频任务(并发: ${batchSize},模式: ${modeLabel})...`)
|
||||||
|
|
||||||
const submitted = []
|
const results = []
|
||||||
for (let i = 0; i < tasks.length; i += concurrency) {
|
|
||||||
const batch = tasks.slice(i, i + concurrency)
|
for (let i = 0; i < tasks.length; i += batchSize) {
|
||||||
const batchResults = await Promise.allSettled(
|
const batch = tasks.slice(i, i + batchSize)
|
||||||
|
const batchLabel = `[${i + 1}-${Math.min(i + batchSize, tasks.length)}/${tasks.length}]`
|
||||||
|
|
||||||
|
// 提交本批
|
||||||
|
const submitResults = await Promise.allSettled(
|
||||||
batch.map(async (task, j) => {
|
batch.map(async (task, j) => {
|
||||||
const idx = i + j
|
const idx = i + j
|
||||||
const prompt = task.videoPrompt || task.prompt
|
const prompt = task.videoPrompt || task.prompt
|
||||||
@@ -429,21 +440,32 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
submitted.push(...batchResults.map(r => r.value || r.reason))
|
|
||||||
}
|
|
||||||
|
|
||||||
|
const submitted = submitResults.map(r => r.status === 'fulfilled' ? r.value : r.reason)
|
||||||
|
const hitQuota = submitted.some(s => !s.taskId && isQuotaError(s.error))
|
||||||
const pendingTasks = submitted.filter(s => s.taskId)
|
const pendingTasks = submitted.filter(s => s.taskId)
|
||||||
|
|
||||||
if (pendingTasks.length === 0) {
|
// 额度不足时:记录本批失败 + 跳过剩余
|
||||||
console.error('\n❌ 所有任务提交失败')
|
if (hitQuota) {
|
||||||
return tasks.map((task, idx) => ({
|
const remaining = tasks.length - i - batch.length
|
||||||
success: false, ...task,
|
for (const s of submitted) {
|
||||||
error: (submitted.find(s => s.idx === idx) || {}).error || '提交失败',
|
results.push({ success: false, ...s.task, error: s.error || '提交失败' })
|
||||||
}))
|
}
|
||||||
|
for (let j = i + batchSize; j < tasks.length; j++) {
|
||||||
|
results.push({ success: false, ...tasks[j], error: '额度不足,未提交' })
|
||||||
|
}
|
||||||
|
console.log(`\n⚠️ 额度不足,跳过剩余 ${remaining + submitted.filter(s => !s.taskId).length} 个任务`)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 2: 并行轮询
|
// 本批全部提交失败,跳过轮询
|
||||||
console.log(`\n⏳ 并行等待 ${pendingTasks.length} 个视频生成...`)
|
for (const s of submitted) {
|
||||||
|
if (!s.taskId) results.push({ success: false, ...s.task, error: s.error || '提交失败' })
|
||||||
|
}
|
||||||
|
if (pendingTasks.length === 0) continue
|
||||||
|
|
||||||
|
// 轮询本批
|
||||||
|
console.log(`\n⏳ ${batchLabel} 等待 ${pendingTasks.length} 个视频生成...`)
|
||||||
|
|
||||||
const pollResults = await Promise.allSettled(
|
const pollResults = await Promise.allSettled(
|
||||||
pendingTasks.map(async ({ idx, taskId, task }) => {
|
pendingTasks.map(async ({ idx, taskId, task }) => {
|
||||||
@@ -456,23 +478,16 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
// 合并结果
|
// 合并本批轮询结果
|
||||||
const results = []
|
for (const s of submitted) {
|
||||||
for (let i = 0; i < tasks.length; i++) {
|
if (!s.taskId) continue
|
||||||
const submittedInfo = submitted.find(s => s.idx === i)
|
const pollResult = pollResults.find(r => r.status === 'fulfilled' && r.value.idx === s.idx)
|
||||||
if (!submittedInfo || !submittedInfo.taskId) {
|
if (pollResult) {
|
||||||
results.push({ success: false, ...tasks[i], error: submittedInfo?.error || '提交失败' })
|
results.push({ success: true, ...s.task, ...pollResult.value })
|
||||||
continue
|
|
||||||
}
|
|
||||||
const pollResult = pollResults.find(r => {
|
|
||||||
if (r.status === 'fulfilled') return r.value.idx === i
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if (pollResult && pollResult.status === 'fulfilled') {
|
|
||||||
results.push({ success: true, ...tasks[i], ...pollResult.value })
|
|
||||||
} else {
|
} else {
|
||||||
const reason = pollResult?.reason?.message || '生成失败'
|
const reason = pollResults.find(r => r.value?.idx === s.idx || r.reason?.idx === s.idx)?.reason?.message || '生成失败'
|
||||||
results.push({ success: false, ...tasks[i], error: reason })
|
results.push({ success: false, ...s.task, error: reason })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,30 @@
|
|||||||
const path = require('path')
|
const path = require('path')
|
||||||
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
|
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
|
||||||
|
|
||||||
|
function isQuotaError(msg) {
|
||||||
|
if (!msg) return false
|
||||||
|
const s = msg.toLowerCase()
|
||||||
|
return /quota|limit|exceed|insufficient|余额|额度|超限|rate.?limit|too.?many/.test(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
function applyPollResult(item, val, dir) {
|
||||||
|
if (val.ok && val.result?.file) {
|
||||||
|
item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
|
||||||
|
item.videoDuration = val.result.duration
|
||||||
|
item.status = 'done'
|
||||||
|
delete item.videoTaskId
|
||||||
|
} else if (val.item) {
|
||||||
|
if (val.isTaskFailure) {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = val.error || '视频生成未返回文件'
|
||||||
|
delete item.videoTaskId
|
||||||
|
} else {
|
||||||
|
log('videos', ` item ${item.id} 生成超时(保留 taskId 待恢复): ${val.error}`)
|
||||||
|
item.status = 'pending'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function phaseVideos(manifest, manifestPath, options) {
|
async function phaseVideos(manifest, manifestPath, options) {
|
||||||
const dir = getManifestDir(manifestPath)
|
const dir = getManifestDir(manifestPath)
|
||||||
const videosDir = path.join(dir, 'videos')
|
const videosDir = path.join(dir, 'videos')
|
||||||
@@ -16,10 +40,6 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
const accountConfig = options.accountConfig || {}
|
const accountConfig = options.accountConfig || {}
|
||||||
const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames'
|
const videoModel = manifest.videoModel || accountConfig.videoModel || 'veo3-fast-frames'
|
||||||
|
|
||||||
// 筛选需要生视频的 item:
|
|
||||||
// done — 正常流程,图片已确认且已上传
|
|
||||||
// pending / failed — 重试场景,agent 只需将 item 设为 pending 即可触发再生
|
|
||||||
// 前提:有 url(图片已上传)+ videoPrompt,且 confirmed 未被显式拒绝
|
|
||||||
const videoCandidates = manifest.items.filter(it => {
|
const videoCandidates = manifest.items.filter(it => {
|
||||||
if (it.confirmed === false) return false
|
if (it.confirmed === false) return false
|
||||||
if (!it.url || !it.videoPrompt) return false
|
if (!it.url || !it.videoPrompt) return false
|
||||||
@@ -44,7 +64,6 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
console.log()
|
console.log()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 已有视频(本地或 OSS)且状态为 done 的跳过,其余清理视频引用但保留 taskId 供恢复
|
|
||||||
const items = []
|
const items = []
|
||||||
for (const it of videoCandidates) {
|
for (const it of videoCandidates) {
|
||||||
if (it.video || it.videoUrl) {
|
if (it.video || it.videoUrl) {
|
||||||
@@ -57,7 +76,6 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
}
|
}
|
||||||
if (items.length === 0) { log('videos', '无待处理 item,跳过'); return }
|
if (items.length === 0) { log('videos', '无待处理 item,跳过'); return }
|
||||||
|
|
||||||
// 选择生成器
|
|
||||||
let Api, pollFn
|
let Api, pollFn
|
||||||
const modelLower = videoModel.toLowerCase()
|
const modelLower = videoModel.toLowerCase()
|
||||||
if (modelLower.includes('grok')) {
|
if (modelLower.includes('grok')) {
|
||||||
@@ -72,6 +90,11 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const ratio = manifest.format || '9:16'
|
const ratio = manifest.format || '9:16'
|
||||||
|
const pollOpts = (item) => ({
|
||||||
|
outputDir: videosDir, aspectRatio: ratio,
|
||||||
|
imageUrl: item.url, lastFrameUrl: item.lastFrameUrl || '',
|
||||||
|
})
|
||||||
|
|
||||||
log('videos', `共 ${items.length} 个, 模型: ${videoModel}`)
|
log('videos', `共 ${items.length} 个, 模型: ${videoModel}`)
|
||||||
|
|
||||||
// Phase 1: 恢复已有任务(有 videoTaskId 的 item)
|
// Phase 1: 恢复已有任务(有 videoTaskId 的 item)
|
||||||
@@ -92,12 +115,7 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
recovered.map(async (item) => {
|
recovered.map(async (item) => {
|
||||||
try {
|
try {
|
||||||
log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`)
|
log('videos', ` 恢复 item ${item.id}: ${item.videoTaskId}`)
|
||||||
const result = await pollFn(item.videoTaskId, item.videoPrompt, {
|
const result = await pollFn(item.videoTaskId, item.videoPrompt, pollOpts(item))
|
||||||
outputDir: videosDir,
|
|
||||||
aspectRatio: ratio,
|
|
||||||
imageUrl: item.url,
|
|
||||||
lastFrameUrl: item.lastFrameUrl || '',
|
|
||||||
})
|
|
||||||
if (result.file) {
|
if (result.file) {
|
||||||
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
|
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
|
||||||
item.videoDuration = result.duration
|
item.videoDuration = result.duration
|
||||||
@@ -106,8 +124,7 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
log('videos', ` item ${item.id} 恢复成功`)
|
log('videos', ` item ${item.id} 恢复成功`)
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const isFail = err.isTaskFailure === true
|
if (err.isTaskFailure === true) {
|
||||||
if (isFail) {
|
|
||||||
log('videos', ` item ${item.id} 恢复失败(任务失败): ${err.message},将重新提交`)
|
log('videos', ` item ${item.id} 恢复失败(任务失败): ${err.message},将重新提交`)
|
||||||
delete item.videoTaskId
|
delete item.videoTaskId
|
||||||
needSubmit.push(item)
|
needSubmit.push(item)
|
||||||
@@ -123,14 +140,16 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
|
|
||||||
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
|
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
|
||||||
|
|
||||||
// Phase 2: 提交新任务(并发 5,Kling 最大并发)
|
// Phase 2+3: 分批提交+轮询(严格并发 ≤ 5,等一批完成再提交下一批)
|
||||||
const concurrency = 5
|
const concurrency = 5
|
||||||
log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency})...`)
|
log('videos', `提交 ${needSubmit.length} 个新任务(并发: ${concurrency})...`)
|
||||||
|
|
||||||
const submitted = []
|
|
||||||
for (let i = 0; i < needSubmit.length; i += concurrency) {
|
for (let i = 0; i < needSubmit.length; i += concurrency) {
|
||||||
const batch = needSubmit.slice(i, i + concurrency)
|
const batch = needSubmit.slice(i, i + concurrency).filter(item => !item.videoTaskId)
|
||||||
const batchResults = await Promise.allSettled(
|
if (batch.length === 0) continue
|
||||||
|
|
||||||
|
// 提交本批
|
||||||
|
const submitResults = await Promise.allSettled(
|
||||||
batch.map(async (item) => {
|
batch.map(async (item) => {
|
||||||
const extraOpts = item.lastFrameUrl
|
const extraOpts = item.lastFrameUrl
|
||||||
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
|
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
|
||||||
@@ -143,38 +162,38 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
for (const r of batchResults) {
|
|
||||||
|
const submitted = []
|
||||||
|
let hitQuota = false
|
||||||
|
for (const r of submitResults) {
|
||||||
const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason }
|
const val = r.status === 'fulfilled' ? r.value : { item: null, taskId: null, error: r.reason }
|
||||||
submitted.push(val)
|
submitted.push(val)
|
||||||
if (val.item && val.taskId) {
|
if (val.item && val.taskId) {
|
||||||
val.item.videoTaskId = val.taskId
|
val.item.videoTaskId = val.taskId
|
||||||
|
} else if (val.item && !val.taskId) {
|
||||||
|
val.item.status = 'failed'
|
||||||
|
val.item.error = val.error || '提交失败'
|
||||||
|
if (isQuotaError(val.error)) hitQuota = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
saveManifest(manifestPath, manifest)
|
saveManifest(manifestPath, manifest)
|
||||||
|
|
||||||
|
if (hitQuota) {
|
||||||
|
log('videos', ` ⚠️ 额度不足,停止提交(跳过剩余 ${needSubmit.length - i - batch.length} 个任务)`)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 3: 轮询新任务
|
// 轮询本批
|
||||||
const pending = submitted.filter(s => s.taskId)
|
const pending = submitted.filter(s => s.taskId)
|
||||||
if (pending.length === 0) {
|
if (pending.length === 0) continue
|
||||||
log('videos', '所有任务提交失败')
|
|
||||||
for (const s of submitted) {
|
|
||||||
if (s.item) { s.item.status = 'failed'; s.item.error = s.error || '提交失败' }
|
|
||||||
}
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log('videos', `等待 ${pending.length} 个视频生成...`)
|
const end = Math.min(i + concurrency, needSubmit.length)
|
||||||
|
log('videos', ` [${i + 1}-${end}/${needSubmit.length}] 等待 ${pending.length} 个视频生成...`)
|
||||||
|
|
||||||
const pollResults = await Promise.allSettled(
|
const pollResults = await Promise.allSettled(
|
||||||
pending.map(async ({ item, taskId }) => {
|
pending.map(async ({ item, taskId }) => {
|
||||||
try {
|
try {
|
||||||
const result = await pollFn(taskId, item.videoPrompt, {
|
const result = await pollFn(taskId, item.videoPrompt, pollOpts(item))
|
||||||
outputDir: videosDir,
|
|
||||||
aspectRatio: ratio,
|
|
||||||
imageUrl: item.url,
|
|
||||||
lastFrameUrl: item.lastFrameUrl || '',
|
|
||||||
})
|
|
||||||
return { item, result, ok: true }
|
return { item, result, ok: true }
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return { item, error: err.message, ok: false, isTaskFailure: err.isTaskFailure === true }
|
return { item, error: err.message, ok: false, isTaskFailure: err.isTaskFailure === true }
|
||||||
@@ -186,20 +205,7 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
const val = r.status === 'fulfilled'
|
const val = r.status === 'fulfilled'
|
||||||
? r.value
|
? r.value
|
||||||
: { ok: false, error: r.reason?.message || String(r.reason), isTaskFailure: r.reason?.isTaskFailure === true }
|
: { ok: false, error: r.reason?.message || String(r.reason), isTaskFailure: r.reason?.isTaskFailure === true }
|
||||||
if (val.ok && val.result.file) {
|
applyPollResult(val.item || {}, val, dir)
|
||||||
val.item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
|
|
||||||
val.item.videoDuration = val.result.duration
|
|
||||||
val.item.status = 'done'
|
|
||||||
delete val.item.videoTaskId
|
|
||||||
} else if (val.item) {
|
|
||||||
if (val.isTaskFailure) {
|
|
||||||
val.item.status = 'failed'
|
|
||||||
val.item.error = val.error || '视频生成未返回文件'
|
|
||||||
delete val.item.videoTaskId
|
|
||||||
} else {
|
|
||||||
log('videos', ` item ${val.item.id} 生成超时(保留 taskId 待恢复): ${val.error}`)
|
|
||||||
val.item.status = 'pending'
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
saveManifest(manifestPath, manifest)
|
saveManifest(manifestPath, manifest)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user