refactor(video-from-script): 提取轮询重试逻辑为共享工具
提取三个视频生成器中重复的 `pollWithRetry` 函数到共享模块 `video-poll-utils`,消除代码重复。新增两层重试机制:轮询级(处理网络瞬断)和任务级(创建新任务 + 提示词优化)。同时优化 `phase-videos` 中视频状态管理和 manifest 保存逻辑。
This commit is contained in:
@@ -402,57 +402,28 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
const { makePollWithRetry } = require('./lib/video-poll-utils')
|
||||||
* 轮询 + 失败重试(单任务)
|
|
||||||
*/
|
const pollWithRetryBase = makePollWithRetry({
|
||||||
|
Api: GrokApi,
|
||||||
|
suffix: '_grok',
|
||||||
|
duration: 6,
|
||||||
|
maxRetries: Config.maxRetries,
|
||||||
|
optimizePrompt: (prompt, failReason, attempt) => PromptOptimizer.optimize(prompt, failReason, attempt),
|
||||||
|
buildCreateOpts: (options) => ({ aspectRatio: options.aspectRatio, size: options.size }),
|
||||||
|
})
|
||||||
|
|
||||||
async function pollWithRetry(taskId, prompt, options = {}) {
|
async function pollWithRetry(taskId, prompt, options = {}) {
|
||||||
let currentTaskId = taskId
|
const result = await pollWithRetryBase(taskId, prompt, options)
|
||||||
let currentPrompt = prompt
|
|
||||||
let lastError = null
|
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= Config.maxRetries; attempt++) {
|
let thumbnailFile = null
|
||||||
try {
|
if (result.thumbnailUrl) {
|
||||||
if (attempt > 0) {
|
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||||
currentPrompt = PromptOptimizer.optimize(prompt, lastError, attempt)
|
thumbnailFile = path.join(options.outputDir || './output', `${timestamp}_thumb.jpg`)
|
||||||
console.log(`\n 🔄 重试 (任务 ${currentTaskId.substring(0, 8)}...): ${currentPrompt.substring(0, 50)}`)
|
try { await download(result.thumbnailUrl, thumbnailFile) } catch (_) {}
|
||||||
currentTaskId = await GrokApi.create(
|
|
||||||
options.imageUrl || '',
|
|
||||||
currentPrompt,
|
|
||||||
{ aspectRatio: options.aspectRatio, size: options.size }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await GrokApi.poll(currentTaskId)
|
|
||||||
|
|
||||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
|
||||||
const videoFile = path.join(options.outputDir || './output', `${timestamp}_grok.mp4`)
|
|
||||||
await download(result.videoUrl, videoFile)
|
|
||||||
|
|
||||||
let thumbnailFile = null
|
|
||||||
if (result.thumbnailUrl) {
|
|
||||||
thumbnailFile = path.join(options.outputDir || './output', `${timestamp}_thumb.jpg`)
|
|
||||||
try { await download(result.thumbnailUrl, thumbnailFile) } catch (_) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
taskId: currentTaskId,
|
|
||||||
prompt: currentPrompt,
|
|
||||||
originalPrompt: prompt,
|
|
||||||
attempts: attempt + 1,
|
|
||||||
file: videoFile,
|
|
||||||
files: [videoFile],
|
|
||||||
duration: 6,
|
|
||||||
thumbnail: thumbnailFile,
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
lastError = err.message
|
|
||||||
if (attempt < Config.maxRetries) {
|
|
||||||
await new Promise(r => setTimeout(r, 5000))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error(`重试 ${Config.maxRetries} 次后仍失败: ${lastError}`)
|
return { ...result, thumbnail: thumbnailFile }
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|||||||
@@ -503,51 +503,16 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
const { makePollWithRetry } = require('./lib/video-poll-utils')
|
||||||
* 轮询 + 失败重试(单任务)
|
|
||||||
*/
|
|
||||||
async function pollWithRetry(taskId, prompt, options = {}) {
|
|
||||||
let currentTaskId = taskId
|
|
||||||
let currentPrompt = prompt
|
|
||||||
let lastError = null
|
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= Config.maxRetries; attempt++) {
|
const pollWithRetry = makePollWithRetry({
|
||||||
try {
|
Api: KlingApi,
|
||||||
if (attempt > 0) {
|
suffix: '_kling',
|
||||||
currentPrompt = PromptOptimizer.optimize(prompt, lastError, attempt)
|
duration: 6,
|
||||||
console.log(`\n 🔄 重试 (任务 ${currentTaskId.substring(0, 8)}...): ${currentPrompt.substring(0, 50)}`)
|
maxRetries: Config.maxRetries,
|
||||||
currentTaskId = await KlingApi.create(
|
optimizePrompt: (prompt, failReason, attempt) => PromptOptimizer.optimize(prompt, failReason, attempt),
|
||||||
options.imageUrl || '',
|
buildCreateOpts: (options) => ({ duration: options.duration, mode: options.mode, lastFrameUrl: options.lastFrameUrl || '' }),
|
||||||
currentPrompt,
|
})
|
||||||
{ duration: options.duration, mode: options.mode, lastFrameUrl: options.lastFrameUrl || '' }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await KlingApi.poll(currentTaskId)
|
|
||||||
|
|
||||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
|
||||||
const videoFile = path.join(options.outputDir || './output', `${timestamp}_kling.mp4`)
|
|
||||||
await download(result.videoUrl, videoFile)
|
|
||||||
|
|
||||||
return {
|
|
||||||
taskId: currentTaskId,
|
|
||||||
prompt: currentPrompt,
|
|
||||||
originalPrompt: prompt,
|
|
||||||
attempts: attempt + 1,
|
|
||||||
file: videoFile,
|
|
||||||
files: [videoFile],
|
|
||||||
duration: 6,
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
lastError = err.message
|
|
||||||
if (attempt < Config.maxRetries) {
|
|
||||||
await new Promise(r => setTimeout(r, 5000))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new Error(`重试 ${Config.maxRetries} 次后仍失败: ${lastError}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// CLI
|
// CLI
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
* 支持 task ID 恢复:中断后重跑时优先恢复已有任务
|
* 支持 task ID 恢复:中断后重跑时优先恢复已有任务
|
||||||
*/
|
*/
|
||||||
|
|
||||||
const fs = require('fs')
|
|
||||||
const path = require('path')
|
const path = require('path')
|
||||||
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
|
const { saveManifest, ensureDir, log, getManifestDir } = require('./pipeline-utils')
|
||||||
|
|
||||||
@@ -24,15 +23,15 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
const videoCandidates = manifest.items.filter(it => {
|
const videoCandidates = manifest.items.filter(it => {
|
||||||
if (it.confirmed === false) return false
|
if (it.confirmed === false) return false
|
||||||
if (!it.url || !it.videoPrompt) return false
|
if (!it.url || !it.videoPrompt) return false
|
||||||
if (['done', 'pending', 'failed'].includes(it.status)) return true
|
return ['done', 'pending', 'failed'].includes(it.status)
|
||||||
return false
|
|
||||||
})
|
})
|
||||||
// 对重试 item 自动清理旧视频引用,无需 agent 手动删除
|
|
||||||
|
// 已有视频(本地或 OSS)且状态为 done 的跳过,其余清理后重新生成
|
||||||
const items = []
|
const items = []
|
||||||
for (const it of videoCandidates) {
|
for (const it of videoCandidates) {
|
||||||
if (it.video) {
|
if (it.video || it.videoUrl) {
|
||||||
if (it.status === 'done') continue // 已有视频且完成,跳过
|
if (it.status === 'done') continue
|
||||||
delete it.video // pending/failed 但有旧 video → 清理重来
|
delete it.video
|
||||||
delete it.videoUrl
|
delete it.videoUrl
|
||||||
delete it.videoDuration
|
delete it.videoDuration
|
||||||
delete it.videoTaskId
|
delete it.videoTaskId
|
||||||
@@ -70,7 +69,6 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 轮询恢复的任务
|
|
||||||
if (recovered.length > 0) {
|
if (recovered.length > 0) {
|
||||||
log('videos', `尝试恢复 ${recovered.length} 个中断任务...`)
|
log('videos', `尝试恢复 ${recovered.length} 个中断任务...`)
|
||||||
await Promise.allSettled(
|
await Promise.allSettled(
|
||||||
@@ -86,6 +84,7 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
if (result.file) {
|
if (result.file) {
|
||||||
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
|
item.video = path.relative(dir, result.file).replace(/\\/g, '/')
|
||||||
item.videoDuration = result.duration
|
item.videoDuration = result.duration
|
||||||
|
item.status = 'done'
|
||||||
delete item.videoTaskId
|
delete item.videoTaskId
|
||||||
log('videos', ` item ${item.id} 恢复成功`)
|
log('videos', ` item ${item.id} 恢复成功`)
|
||||||
}
|
}
|
||||||
@@ -94,9 +93,9 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
delete item.videoTaskId
|
delete item.videoTaskId
|
||||||
needSubmit.push(item)
|
needSubmit.push(item)
|
||||||
}
|
}
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
|
if (needSubmit.length === 0) { log('videos', '全部通过恢复完成'); return }
|
||||||
@@ -110,13 +109,9 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
const batch = needSubmit.slice(i, i + concurrency)
|
const batch = needSubmit.slice(i, i + concurrency)
|
||||||
const batchResults = await Promise.allSettled(
|
const batchResults = await Promise.allSettled(
|
||||||
batch.map(async (item) => {
|
batch.map(async (item) => {
|
||||||
const images = item.lastFrameUrl
|
|
||||||
? [item.url, item.lastFrameUrl]
|
|
||||||
: [item.url]
|
|
||||||
const extraOpts = item.lastFrameUrl
|
const extraOpts = item.lastFrameUrl
|
||||||
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
|
? { aspectRatio: ratio, lastFrameUrl: item.lastFrameUrl, mode: 'pro' }
|
||||||
: { aspectRatio: ratio }
|
: { aspectRatio: ratio }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const taskId = await Api.create(item.url, item.videoPrompt, extraOpts)
|
const taskId = await Api.create(item.url, item.videoPrompt, extraOpts)
|
||||||
return { item, taskId, error: null }
|
return { item, taskId, error: null }
|
||||||
@@ -169,14 +164,15 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
if (val.ok && val.result.file) {
|
if (val.ok && val.result.file) {
|
||||||
val.item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
|
val.item.video = path.relative(dir, val.result.file).replace(/\\/g, '/')
|
||||||
val.item.videoDuration = val.result.duration
|
val.item.videoDuration = val.result.duration
|
||||||
|
val.item.status = 'done'
|
||||||
delete val.item.videoTaskId
|
delete val.item.videoTaskId
|
||||||
} else if (val.item) {
|
} else if (val.item) {
|
||||||
val.item.status = 'failed'
|
val.item.status = 'failed'
|
||||||
val.item.error = val.error || '视频生成未返回文件'
|
val.item.error = val.error || '视频生成未返回文件'
|
||||||
delete val.item.videoTaskId
|
delete val.item.videoTaskId
|
||||||
}
|
}
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
}
|
}
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
|
||||||
// 上传视频到 OSS
|
// 上传视频到 OSS
|
||||||
const { uploadFile } = require('../oss-upload')
|
const { uploadFile } = require('../oss-upload')
|
||||||
@@ -192,11 +188,9 @@ async function phaseVideos(manifest, manifestPath, options) {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
log('videos', ` ${item.video} 上传失败: ${err.message}`)
|
log('videos', ` ${item.video} 上传失败: ${err.message}`)
|
||||||
}
|
}
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
}
|
}
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
}
|
}
|
||||||
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { phaseVideos }
|
module.exports = { phaseVideos }
|
||||||
|
|||||||
@@ -0,0 +1,85 @@
|
|||||||
|
/**
|
||||||
|
* 共享视频轮询重试工具
|
||||||
|
*
|
||||||
|
* 提供 pollWithRetry 工厂函数,供 kling/veo/grok 三个视频生成器共用。
|
||||||
|
* 两层重试:轮询级(同一 taskId,处理网络瞬断)→ 任务级(创建新 task + 优化提示词)
|
||||||
|
*/
|
||||||
|
|
||||||
|
const TRANSIENT_RE = /timeout|ECONNRESET|ETIMEDOUT|network|socket/i
|
||||||
|
|
||||||
|
const POLL_RETRIES = 2 // 同一 task 轮询重试次数
|
||||||
|
const POLL_RETRY_DELAY = 5000 // 轮询重试间隔 ms
|
||||||
|
const TASK_RETRY_DELAY = 5000 // 任务级重试间隔 ms
|
||||||
|
|
||||||
|
function isTransientError(err) {
|
||||||
|
return TRANSIENT_RE.test(err.message || '')
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建 pollWithRetry 函数
|
||||||
|
*
|
||||||
|
* @param {object} opts
|
||||||
|
* @param {object} opts.Api - 有 create() 和 poll() 方法的 API 对象
|
||||||
|
* @param {string} opts.suffix - 输出文件后缀(如 '_kling')
|
||||||
|
* @param {number} opts.duration - 视频时长(秒)
|
||||||
|
* @param {number} [opts.maxRetries=3] - 任务级最大重试次数
|
||||||
|
* @param {function} [opts.optimizePrompt] - 提示词优化函数 (prompt, failReason, attempt) => optimizedPrompt
|
||||||
|
* @param {function} opts.buildCreateOpts - (item_options) => create() 的第三个参数
|
||||||
|
* @returns {function} pollWithRetry(taskId, prompt, options)
|
||||||
|
*/
|
||||||
|
function makePollWithRetry({ Api, suffix, duration, maxRetries = 3, optimizePrompt, buildCreateOpts }) {
|
||||||
|
return async function pollWithRetry(taskId, prompt, options = {}) {
|
||||||
|
let currentTaskId = taskId
|
||||||
|
let currentPrompt = prompt
|
||||||
|
let lastError = null
|
||||||
|
|
||||||
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||||
|
if (attempt > 0) {
|
||||||
|
if (optimizePrompt) {
|
||||||
|
currentPrompt = optimizePrompt(prompt, lastError, attempt)
|
||||||
|
}
|
||||||
|
console.log(`\n 🔄 重试 (任务 ${currentTaskId.substring(0, 8)}...): ${currentPrompt.substring(0, 50)}`)
|
||||||
|
const createOpts = buildCreateOpts(options)
|
||||||
|
currentTaskId = await Api.create(options.imageUrl || '', currentPrompt, createOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
const outputDir = options.outputDir || './output'
|
||||||
|
|
||||||
|
for (let pollAttempt = 0; pollAttempt <= POLL_RETRIES; pollAttempt++) {
|
||||||
|
try {
|
||||||
|
const result = await Api.poll(currentTaskId)
|
||||||
|
|
||||||
|
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||||
|
const videoFile = path.join(outputDir, `${timestamp}${suffix}.mp4`)
|
||||||
|
await download(result.videoUrl, videoFile)
|
||||||
|
|
||||||
|
return {
|
||||||
|
taskId: currentTaskId,
|
||||||
|
prompt: currentPrompt,
|
||||||
|
originalPrompt: prompt,
|
||||||
|
attempts: attempt + 1,
|
||||||
|
file: videoFile,
|
||||||
|
files: [videoFile],
|
||||||
|
duration,
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
lastError = err.message
|
||||||
|
if (isTransientError(err) && pollAttempt < POLL_RETRIES) {
|
||||||
|
console.log(` ⚠ 轮询瞬断 (${pollAttempt + 1}/${POLL_RETRIES}): ${err.message.slice(0, 60)}`)
|
||||||
|
await new Promise(r => setTimeout(r, POLL_RETRY_DELAY))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (attempt < maxRetries) {
|
||||||
|
await new Promise(r => setTimeout(r, TASK_RETRY_DELAY))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(`重试 ${maxRetries} 次后仍失败: ${lastError}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { makePollWithRetry, POLL_RETRIES, POLL_RETRY_DELAY, TASK_RETRY_DELAY }
|
||||||
@@ -406,51 +406,16 @@ async function batchGenerate(tasks, options = {}) {
|
|||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
const { makePollWithRetry } = require('./lib/video-poll-utils')
|
||||||
* 轮询 + 失败重试(单任务)
|
|
||||||
*/
|
|
||||||
async function pollWithRetry(taskId, prompt, options = {}) {
|
|
||||||
let currentTaskId = taskId
|
|
||||||
let currentPrompt = prompt
|
|
||||||
let lastError = null
|
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= Config.maxRetries; attempt++) {
|
const pollWithRetry = makePollWithRetry({
|
||||||
try {
|
Api: VeoApi,
|
||||||
if (attempt > 0) {
|
suffix: '_veo',
|
||||||
currentPrompt = PromptOptimizer.optimize(prompt, lastError, attempt)
|
duration: 8,
|
||||||
console.log(`\n 🔄 重试 (任务 ${currentTaskId.substring(0, 8)}...): ${currentPrompt.substring(0, 50)}`)
|
maxRetries: Config.maxRetries,
|
||||||
currentTaskId = await VeoApi.create(
|
optimizePrompt: (prompt, failReason, attempt) => PromptOptimizer.optimize(prompt, failReason, attempt),
|
||||||
options.imageUrl || '',
|
buildCreateOpts: (options) => ({ aspectRatio: options.aspectRatio, lastFrameUrl: options.lastFrameUrl || '' }),
|
||||||
currentPrompt,
|
})
|
||||||
{ aspectRatio: options.aspectRatio, lastFrameUrl: options.lastFrameUrl || '' }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await VeoApi.poll(currentTaskId)
|
|
||||||
|
|
||||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
|
||||||
const videoFile = path.join(options.outputDir || './output', `${timestamp}_veo.mp4`)
|
|
||||||
await download(result.videoUrl, videoFile)
|
|
||||||
|
|
||||||
return {
|
|
||||||
taskId: currentTaskId,
|
|
||||||
prompt: currentPrompt,
|
|
||||||
originalPrompt: prompt,
|
|
||||||
attempts: attempt + 1,
|
|
||||||
file: videoFile,
|
|
||||||
files: [videoFile],
|
|
||||||
duration: 8,
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
lastError = err.message
|
|
||||||
if (attempt < Config.maxRetries) {
|
|
||||||
await new Promise(r => setTimeout(r, 5000))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new Error(`重试 ${Config.maxRetries} 次后仍失败: ${lastError}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// CLI
|
// CLI
|
||||||
|
|||||||
Reference in New Issue
Block a user