refactor(video-pipeline): 将 MJ 生图拆分为提交和收割两阶段策略
将 `phaseImages` 中的图片生成逻辑重构为模块化的调度策略: - Gemini/Kling 使用滑动窗口并发策略,完成一个立即补一个 - MJ 使用两阶段策略:先串行提交所有任务拿 taskId,再滑动窗口收割 - 提取 `submitMJ`、`harvestMJ`、`processItem` 等可复用函数 - 减少 Promise.allSettled 的冗余日志和状态维护
This commit is contained in:
@@ -9,8 +9,6 @@ const fs = require('fs')
|
|||||||
const path = require('path')
|
const path = require('path')
|
||||||
const { saveManifest, getReferences, ensureDir, renameGeneratedFile, log, getManifestDir } = require('./pipeline-utils')
|
const { saveManifest, getReferences, ensureDir, renameGeneratedFile, log, getManifestDir } = require('./pipeline-utils')
|
||||||
|
|
||||||
const IMAGE_CONCURRENCY = 3
|
|
||||||
|
|
||||||
async function phaseImages(manifest, manifestPath, options) {
|
async function phaseImages(manifest, manifestPath, options) {
|
||||||
const dir = getManifestDir(manifestPath)
|
const dir = getManifestDir(manifestPath)
|
||||||
const imagesDir = path.join(dir, 'images')
|
const imagesDir = path.join(dir, 'images')
|
||||||
@@ -33,66 +31,14 @@ async function phaseImages(manifest, manifestPath, options) {
|
|||||||
}
|
}
|
||||||
const refs = getReferences(manifest, accountConfig)
|
const refs = getReferences(manifest, accountConfig)
|
||||||
|
|
||||||
log('images', `共 ${items.length} 张, 模型: ${model}, 画幅: ${ratio}, 参考图: ${refs.localPaths.length}本地/${refs.urls.length}URL, 并发: ${IMAGE_CONCURRENCY}`)
|
log('images', `共 ${items.length} 张, 模型: ${model}, 画幅: ${ratio}, 参考图: ${refs.localPaths.length}本地/${refs.urls.length}URL, 并发: 全并行`)
|
||||||
|
|
||||||
// 分批并发处理
|
if (model === 'mj') {
|
||||||
for (let batchStart = 0; batchStart < items.length; batchStart += IMAGE_CONCURRENCY) {
|
// MJ 两阶段策略:先全部提交拿 taskId,再滑动窗口收割
|
||||||
const batch = items.slice(batchStart, batchStart + IMAGE_CONCURRENCY)
|
await phaseImagesMJ(items, manifest, manifestPath, dir, imagesDir, model, ratio, refs)
|
||||||
|
|
||||||
const results = await Promise.allSettled(
|
|
||||||
batch.map(async (item) => {
|
|
||||||
const idx = item.id
|
|
||||||
try {
|
|
||||||
item.status = 'generating'
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
|
|
||||||
// 仅补 lastFrame:首帧已存在,跳过首帧生成
|
|
||||||
if (item.file && manifest.mode === 'framePair' && item.lastFramePrompt && !item.lastFrame) {
|
|
||||||
log('images', `[${idx}] 补生成 lastFrame(首帧已有: ${item.file})`)
|
|
||||||
await generateLastFrame(item, idx, manifest, dir, imagesDir, model, ratio, manifestPath)
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
return { ok: true }
|
|
||||||
}
|
|
||||||
|
|
||||||
let result
|
|
||||||
if (model === 'gemini') {
|
|
||||||
result = await generateGemini(item, idx, dir, imagesDir, ratio, refs)
|
|
||||||
} else if (model === 'mj') {
|
|
||||||
result = await generateMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest)
|
|
||||||
} else if (model === 'kling') {
|
|
||||||
result = await generateKling(item, idx, dir, imagesDir, ratio, refs)
|
|
||||||
} else {
|
} else {
|
||||||
throw new Error(`不支持的模型: ${model}(支持: gemini, mj, kling)`)
|
// Gemini/Kling:滑动窗口并发
|
||||||
}
|
await phaseImagesSlidingWindow(items, manifest, manifestPath, dir, imagesDir, model, ratio, refs)
|
||||||
|
|
||||||
if (result.file) {
|
|
||||||
item.file = result.file
|
|
||||||
if (result.candidates) item.candidates = result.candidates
|
|
||||||
item.status = 'done'
|
|
||||||
log('images', `[${idx}] 完成: ${item.file}`)
|
|
||||||
} else {
|
|
||||||
item.status = 'failed'
|
|
||||||
item.error = '生成器未返回文件'
|
|
||||||
log('images', `[${idx}] 失败: 生成器未返回文件`)
|
|
||||||
}
|
|
||||||
// 每个 item 完成后立即写盘,防止崩溃丢失已完成的结果
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
|
|
||||||
// 首尾帧模式:生成第二张图
|
|
||||||
if (item.status === 'done' && manifest.mode === 'framePair' && item.lastFramePrompt && !item.lastFrame) {
|
|
||||||
await generateLastFrame(item, idx, manifest, dir, imagesDir, model, ratio, manifestPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
return { ok: true }
|
|
||||||
} catch (err) {
|
|
||||||
item.status = 'failed'
|
|
||||||
item.error = err.message
|
|
||||||
log('images', `[${idx}] 失败: ${err.message}`)
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
return { ok: false, error: err.message }
|
|
||||||
}
|
|
||||||
})
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,46 +71,41 @@ async function generateGemini(item, idx, dir, imagesDir, ratio, refs) {
|
|||||||
return { file }
|
return { file }
|
||||||
}
|
}
|
||||||
|
|
||||||
async function generateMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest) {
|
/**
|
||||||
const { MJApi, ImageUtils } = require('../mj-image-generator')
|
* MJ 提交阶段:仅提交任务,返回 taskId(不轮询)
|
||||||
|
* 用于与滑动窗口配合,先批量提交再并行轮询
|
||||||
|
*/
|
||||||
|
async function submitMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest) {
|
||||||
|
const { MJApi } = require('../mj-image-generator')
|
||||||
const referenceImages = refs.urls.length > 0 ? refs.urls : []
|
const referenceImages = refs.urls.length > 0 ? refs.urls : []
|
||||||
const styleWeight = 200
|
const styleWeight = 200
|
||||||
|
|
||||||
let result
|
// 已有 taskId 的跳过提交(恢复场景)
|
||||||
|
|
||||||
// 尝试恢复中断的 MJ 任务
|
|
||||||
if (item.taskId && item.status === 'generating') {
|
if (item.taskId && item.status === 'generating') {
|
||||||
try {
|
log('images', `[${idx}] MJ 跳过提交,已有 taskId: ${item.taskId}`)
|
||||||
log('images', `[${idx}] 恢复 MJ 任务: ${item.taskId}`)
|
return item.taskId
|
||||||
|
}
|
||||||
|
|
||||||
|
log('images', `[${idx}] MJ 提交: ${item.imagePrompt.substring(0, 60)}...`)
|
||||||
|
const taskId = await MJApi.submit(item.imagePrompt, { referenceImages, aspectRatio: ratio, styleWeight })
|
||||||
|
item.taskId = taskId
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
return taskId
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MJ 收割阶段:轮询 + 下载 + 拆分 + 重命名
|
||||||
|
*/
|
||||||
|
async function harvestMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest) {
|
||||||
|
const { MJApi, ImageUtils } = require('../mj-image-generator')
|
||||||
|
|
||||||
const pollResult = await MJApi.poll(item.taskId)
|
const pollResult = await MJApi.poll(item.taskId)
|
||||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||||
const gridFile = path.join(imagesDir, `${timestamp}_grid.png`)
|
const gridFile = path.join(imagesDir, `${timestamp}_grid.png`)
|
||||||
await ImageUtils.download(pollResult.imageUrl, gridFile)
|
await ImageUtils.download(pollResult.imageUrl, gridFile)
|
||||||
const splitFiles = await ImageUtils.split4(gridFile, imagesDir, timestamp)
|
const splitFiles = await ImageUtils.split4(gridFile, imagesDir, timestamp)
|
||||||
fs.unlinkSync(gridFile)
|
fs.unlinkSync(gridFile)
|
||||||
result = { files: splitFiles }
|
const result = { files: splitFiles }
|
||||||
log('images', `[${idx}] MJ 任务恢复成功`)
|
|
||||||
} catch (err) {
|
|
||||||
log('images', `[${idx}] MJ 任务恢复失败: ${err.message},重新提交`)
|
|
||||||
delete item.taskId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 新提交
|
|
||||||
if (!result) {
|
|
||||||
log('images', `[${idx}] MJ 生图: ${item.imagePrompt.substring(0, 60)}...`)
|
|
||||||
const taskId = await MJApi.submit(item.imagePrompt, { referenceImages, aspectRatio: ratio, styleWeight })
|
|
||||||
item.taskId = taskId
|
|
||||||
saveManifest(manifestPath, manifest)
|
|
||||||
|
|
||||||
const pollResult = await MJApi.poll(taskId)
|
|
||||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
|
||||||
const gridFile = path.join(imagesDir, `${timestamp}_grid.png`)
|
|
||||||
await ImageUtils.download(pollResult.imageUrl, gridFile)
|
|
||||||
const splitFiles = await ImageUtils.split4(gridFile, imagesDir, timestamp)
|
|
||||||
fs.unlinkSync(gridFile)
|
|
||||||
result = { files: splitFiles }
|
|
||||||
}
|
|
||||||
|
|
||||||
const file = (result.files && result.files.length > 0) ? result.files[0] : null
|
const file = (result.files && result.files.length > 0) ? result.files[0] : null
|
||||||
const candidates = (result.files && result.files.length > 0)
|
const candidates = (result.files && result.files.length > 0)
|
||||||
@@ -184,6 +125,11 @@ async function generateMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath,
|
|||||||
return { file }
|
return { file }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function generateMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest) {
|
||||||
|
await submitMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest)
|
||||||
|
return harvestMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest)
|
||||||
|
}
|
||||||
|
|
||||||
async function generateKling(item, idx, dir, imagesDir, ratio, refs) {
|
async function generateKling(item, idx, dir, imagesDir, ratio, refs) {
|
||||||
const { generate: klingGen } = require('../kling-image-generator')
|
const { generate: klingGen } = require('../kling-image-generator')
|
||||||
const klingOpts = { outputDir: imagesDir, aspectRatio: ratio }
|
const klingOpts = { outputDir: imagesDir, aspectRatio: ratio }
|
||||||
@@ -243,4 +189,119 @@ async function generateLastFrame(item, idx, manifest, dir, imagesDir, model, rat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// 调度策略
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gemini/Kling:全部并行
|
||||||
|
*/
|
||||||
|
async function phaseImagesSlidingWindow(items, manifest, manifestPath, dir, imagesDir, model, ratio, refs) {
|
||||||
|
await Promise.allSettled(items.map(item =>
|
||||||
|
processItem(item, manifest, manifestPath, dir, imagesDir, model, ratio, refs)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MJ 两阶段策略:
|
||||||
|
* 1. 先串行提交所有任务拿 taskId(MJ API 限制同时只能提交一个,但提交很快)
|
||||||
|
* 2. 滑动窗口收割:轮询+下载+拆分,完成一个立即补一个
|
||||||
|
*/
|
||||||
|
async function phaseImagesMJ(items, manifest, manifestPath, dir, imagesDir, model, ratio, refs) {
|
||||||
|
// 阶段1:全部提交
|
||||||
|
log('images', `=== MJ 阶段1: 提交 ${items.length} 个任务 ===`)
|
||||||
|
for (const item of items) {
|
||||||
|
const idx = item.id
|
||||||
|
try {
|
||||||
|
item.status = 'generating'
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
await submitMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest)
|
||||||
|
} catch (err) {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = err.message
|
||||||
|
log('images', `[${idx}] MJ 提交失败: ${err.message}`)
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 阶段2:全部并行收割(MJ 轮询是轻量 HTTP,不受本地并发限制)
|
||||||
|
const harvestItems = items.filter(it => it.taskId && it.status === 'generating')
|
||||||
|
log('images', `=== MJ 阶段2: 并行收割 ${harvestItems.length} 个任务 ===`)
|
||||||
|
|
||||||
|
await Promise.allSettled(harvestItems.map(async (item) => {
|
||||||
|
const idx = item.id
|
||||||
|
try {
|
||||||
|
const result = await harvestMJ(item, idx, dir, imagesDir, ratio, refs, manifestPath, manifest)
|
||||||
|
if (result.file) {
|
||||||
|
item.file = result.file
|
||||||
|
if (result.candidates) item.candidates = result.candidates
|
||||||
|
item.status = 'done'
|
||||||
|
log('images', `[${idx}] 完成: ${item.file}`)
|
||||||
|
} else {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = '生成器未返回文件'
|
||||||
|
}
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
|
||||||
|
if (item.status === 'done' && manifest.mode === 'framePair' && item.lastFramePrompt && !item.lastFrame) {
|
||||||
|
await generateLastFrame(item, idx, manifest, dir, imagesDir, model, ratio, manifestPath)
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = err.message
|
||||||
|
log('images', `[${idx}] 收割失败: ${err.message}`)
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通用 item 处理(Gemini/Kling 用)
|
||||||
|
*/
|
||||||
|
async function processItem(item, manifest, manifestPath, dir, imagesDir, model, ratio, refs) {
|
||||||
|
const idx = item.id
|
||||||
|
try {
|
||||||
|
item.status = 'generating'
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
|
||||||
|
if (item.file && manifest.mode === 'framePair' && item.lastFramePrompt && !item.lastFrame) {
|
||||||
|
log('images', `[${idx}] 补生成 lastFrame(首帧已有: ${item.file})`)
|
||||||
|
await generateLastFrame(item, idx, manifest, dir, imagesDir, model, ratio, manifestPath)
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
return { ok: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
let result
|
||||||
|
if (model === 'gemini') {
|
||||||
|
result = await generateGemini(item, idx, dir, imagesDir, ratio, refs)
|
||||||
|
} else if (model === 'kling') {
|
||||||
|
result = await generateKling(item, idx, dir, imagesDir, ratio, refs)
|
||||||
|
} else {
|
||||||
|
throw new Error(`不支持的模型: ${model}(支持: gemini, mj, kling)`)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.file) {
|
||||||
|
item.file = result.file
|
||||||
|
if (result.candidates) item.candidates = result.candidates
|
||||||
|
item.status = 'done'
|
||||||
|
log('images', `[${idx}] 完成: ${item.file}`)
|
||||||
|
} else {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = '生成器未返回文件'
|
||||||
|
log('images', `[${idx}] 失败: 生成器未返回文件`)
|
||||||
|
}
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
|
||||||
|
if (item.status === 'done' && manifest.mode === 'framePair' && item.lastFramePrompt && !item.lastFrame) {
|
||||||
|
await generateLastFrame(item, idx, manifest, dir, imagesDir, model, ratio, manifestPath)
|
||||||
|
}
|
||||||
|
return { ok: true }
|
||||||
|
} catch (err) {
|
||||||
|
item.status = 'failed'
|
||||||
|
item.error = err.message
|
||||||
|
log('images', `[${idx}] 失败: ${err.message}`)
|
||||||
|
saveManifest(manifestPath, manifest)
|
||||||
|
return { ok: false, error: err.message }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = { phaseImages }
|
module.exports = { phaseImages }
|
||||||
|
|||||||
Reference in New Issue
Block a user