diff --git a/prompts/scripts/clear-all-jobs.ts b/prompts/scripts/clear-all-jobs.ts new file mode 100644 index 0000000..4d38bcb --- /dev/null +++ b/prompts/scripts/clear-all-jobs.ts @@ -0,0 +1,31 @@ +import { getRedisConnection } from '../src/index.ts'; +import { Queue } from 'bullmq' + +export async function clearAllJobs() { + const connection = getRedisConnection(); + const queueNames = [ + 'image-download', + 'image-generate', + 'perfect-prompt', + 'perfect-sentence-prompt' + ]; + + for (const queueName of queueNames) { + const queue = new Queue(queueName, { connection }); + await queue.drain(); + await queue.clean(0, 1000, 'completed'); + await queue.clean(0, 1000, 'failed'); + console.log(`Cleared all jobs in queue: ${queueName}`); + await queue.close(); + } + + await connection.quit(); +} + +clearAllJobs().then(() => { + console.log('All jobs cleared.'); + process.exit(0); +}).catch((error) => { + console.error('Error clearing jobs:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/prompts/src/app.ts b/prompts/src/app.ts index ce40d07..793e93e 100644 --- a/prompts/src/app.ts +++ b/prompts/src/app.ts @@ -14,7 +14,9 @@ export const jimengService = useContextKey('jimeng', new JimengService({ baseUrl: config.JIMENG_API_URL, timeout: parseInt(config.JIMENG_TIMEOUT || '300000'), })); - +export { + getRedisConnection +} export const ossService = useContextKey('oss', new OSSService({ accessKeyId: config.S3_ACCESS_KEY_ID, accessKeySecret: config.S3_ACCESS_KEY_SECRET, @@ -32,4 +34,7 @@ export const app = useContextKey('app', new App()); export const ai = useContextKey('ai', new Kevisual({ apiKey: config.KEVISUAL_NEW_API_KEY, -})); \ No newline at end of file + model: 'qwen-turbo', +})); + +export const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); \ No newline at end of file diff --git a/prompts/src/routes/index.ts b/prompts/src/routes/index.ts index 6e81995..ce37be1 100644 --- a/prompts/src/routes/index.ts +++ b/prompts/src/routes/index.ts @@ -6,6 +6,7 @@ import './image-update.ts' import './create-sentence.ts' +import './perfect.ts' app.route({ path: 'auth', id: 'auth' diff --git a/prompts/src/routes/perfect.ts b/prompts/src/routes/perfect.ts new file mode 100644 index 0000000..64c6b0a --- /dev/null +++ b/prompts/src/routes/perfect.ts @@ -0,0 +1,46 @@ +import { app, ossService, pbService, redis } from '@/app.ts' +import { addPerfectPromptJob } from '@/task/perfect-prompt.job.ts'; +import { addPerfectSentencePromptJob } from '@/task/perfect-sentence.job.ts'; + +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + +app.route({ + path: 'image-creator', + key: 'perfect-horse', + description: '优化关于像素马的关键字', + middleware: ['auth'] +}).define(async (ctx) => { + const id = ctx.query.id as string; + if (!id) { + ctx.throw(400, '缺少 id 参数'); + } + const item = await pbService.collection.getOne(id); + if (!item) { + ctx.throw(404, '未找到对应的条目'); + } + addPerfectPromptJob(item); + ctx.body = { message: '已添加优化任务到队列' }; +}).addTo(app); + + +app.route({ + path: 'image-creator', + key: 'perfect-sentence', + description: '优化关于句子的关键字', + middleware: ['auth'] +}).define(async (ctx) => { + const list = await pbService.collection.getFullList({ + filter: `status="创建"`, + limit: 1 + }); + console.log(`Fetched ${list.length} items for sentence perfection.`); + const hasSentences = list.filter(item => { + return item.data && item.data.sentence; + }); + for (const item of hasSentences) { + addPerfectSentencePromptJob(item); + await sleep(100); // 避免短时间内添加过多任务 + } + console.log(`Added ${list.length} perfect prompt jobs to the queue.`); + ctx.body = { message: '已添加优化任务到队列' }; +}).addTo(app); \ No newline at end of file diff --git a/prompts/src/services/pb.service.ts b/prompts/src/services/pb.service.ts index 452d69e..01ea405 100644 --- a/prompts/src/services/pb.service.ts +++ b/prompts/src/services/pb.service.ts @@ -135,6 +135,7 @@ export class PBService extends PBCore { collectionName = 'images_generation_tasks'; constructor(options: PBOptions) { super(options); + this.client.autoCancellation(false) } getCollection(name: string) { return this.client.collection(name); diff --git a/prompts/src/task/image-creator.job.ts b/prompts/src/task/image-creator.job.ts index 7857aea..c801200 100644 --- a/prompts/src/task/image-creator.job.ts +++ b/prompts/src/task/image-creator.job.ts @@ -5,7 +5,6 @@ import type { ImageCollection } from '../services/pb.service.ts'; import { updateItemStatus } from '../routes/image-update.ts'; import { notify } from '@/module/logger.ts'; -export const IMAGE_CREATOR_JOB = 'image-creator'; export const IMAGE_GENERATE_JOB = 'image-generate'; export const IMAGE_DOWNLOAD_JOB = 'image-download'; @@ -148,6 +147,8 @@ export async function runImageDownloadWorker(): Promise { { connection, concurrency: 3, + lockDuration: 60000 * 5, // 锁持续时间 5分钟 + stalledInterval: 30000, // 每30秒检查一次 stalled } as any ); @@ -178,7 +179,10 @@ export async function runImageGenerateWorker(): Promise { try { // 调用 jimeng API 生成图片 - const result = await jimengService.generateImage({ prompt }); + const result = await jimengService.generateImage({ + prompt, + ratio: '9:16', + }); if (result.code !== 200 || !result.data?.data?.length) { throw new Error(result.message || 'Failed to generate image'); @@ -197,20 +201,14 @@ export async function runImageGenerateWorker(): Promise { return { success: true, images }; } catch (error: any) { console.error(`[ImageGenerate] Error: ${error.message}`); - - // 重试次数用尽,暂停任务并停止当前 worker - if (job.attemptsMade >= GENERATE_MAX_RETRIES - 1) { - await updateItemStatus(itemId, ImageTaskStatus.PAUSED); - console.error(`[ImageGenerate] Max retries exceeded. Stopping worker...`); - await worker.close(); - } - throw error; } }, { connection, concurrency: 1, // jimeng API 有节流限制,设置为 1 + lockDuration: 60000 * 5, // 锁持续时间 5分钟 + stalledInterval: 30000, // 每30秒检查一次 stalled } ); @@ -221,6 +219,10 @@ export async function runImageGenerateWorker(): Promise { worker.on('failed', (job, err) => { console.error(`[ImageGenerate] Job failed: ${job?.id}, error: ${err.message}`); notify.notify(`[ImageGenerate] \nJob failed: ${job?.id}, error: ${err.message}\n Job data: ${JSON.stringify(job?.data)}`); + if (job && job.attemptsMade >= GENERATE_MAX_RETRIES - 1) { + worker.close(); + notify.notify(`[ImageGenerate] Worker stopped after reaching max retries for item ${job.data.itemId}`); + } }); console.log('[ImageGenerate] Worker started'); diff --git a/prompts/src/task/perfect-prompt.job.ts b/prompts/src/task/perfect-prompt.job.ts index bde6222..6e0373c 100644 --- a/prompts/src/task/perfect-prompt.job.ts +++ b/prompts/src/task/perfect-prompt.job.ts @@ -5,6 +5,7 @@ import type { ImageCollection } from '../services/pb.service.ts'; import { updateItemStatus } from '../routes/image-update.ts'; import { Queue } from 'bullmq'; import { notify } from '@/module/logger.ts'; +import { addImageGenerateJob } from './image-creator.job.ts'; export const PERFECT_PROMPT_JOB = 'perfect-prompt'; @@ -22,6 +23,7 @@ export interface PerfectPromptJobData { itemId: string; prompt: string; collectionName?: string; + data?: Record; } // 优化提示词的模板 @@ -97,6 +99,11 @@ export async function runPerfectPromptWorker(): Promise { await updateItemStatus(itemId, PerfectPromptStatus.PLANNING, { description: perfectText, }); + // 任务完成,把任务抛给下一个图片生成队列 + const item = await pbService.collection.getOne(itemId); + if (item) { + addImageGenerateJob(item) + } return { success: true, perfectPrompt: perfectText }; } catch (error: any) { @@ -113,6 +120,8 @@ export async function runPerfectPromptWorker(): Promise { { connection, concurrency: 2, + lockDuration: 60000 * 5, // 锁持续时间 5分钟 + stalledInterval: 30000, // 每30秒检查一次 stalled } ); @@ -123,6 +132,10 @@ export async function runPerfectPromptWorker(): Promise { worker.on('failed', (job, err) => { console.error(`[PerfectPrompt] Job failed: ${job?.id}, error: ${err.message}`); notify.notify(`[PerfectPrompt] \nJob failed: ${job?.id}, error: ${err.message}\n Job data: ${JSON.stringify(job?.data)}`); + if (job && job.attemptsMade >= MAX_RETRIES - 1) { + worker.close(); + notify.notify(`[PerfectPrompt] Worker stopped after reaching max retries for item ${job.data.itemId}`); + } }); console.log('[PerfectPrompt] Worker started'); diff --git a/prompts/src/task/perfect-sentence.job.ts b/prompts/src/task/perfect-sentence.job.ts new file mode 100644 index 0000000..61a83ed --- /dev/null +++ b/prompts/src/task/perfect-sentence.job.ts @@ -0,0 +1,121 @@ +import { Worker, Job } from 'bullmq'; +import { getRedisConnection } from '../module/redis.ts'; +import { Prompt, pbService, ai, app } from '../index.ts'; +import type { ImageCollection } from '../services/pb.service.ts'; +import { updateItemStatus } from '../routes/image-update.ts'; +import { Queue } from 'bullmq'; +import { notify } from '@/module/logger.ts'; +import { addImageGenerateJob } from './image-creator.job.ts'; +import { PerfectPromptJobData } from './perfect-prompt.job.ts'; +import { SentenceImage } from '@/module/sentence-image.ts'; +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); +export const PERFECT_SENTENCE_PROMPT_JOB = 'perfect-sentence-prompt'; + +// 最大重试次数 +const MAX_RETRIES = 3; + +// 状态常量 +export const PerfectPromptStatus = { + PENDING: '提示词优化中' as const, + PLANNING: '计划中' as const, + FAILED: '失败' as const, +}; + +export async function addPerfectSentencePromptJob(item: ImageCollection): Promise { + const connection = getRedisConnection(); + const queue = new Queue(PERFECT_SENTENCE_PROMPT_JOB, { connection }); + const jobData: PerfectPromptJobData = { + itemId: item.id, + prompt: item.description || item.summary || item.title || '', + collectionName: pbService.collectionName, + data: item.data + }; + + await queue.add(PERFECT_SENTENCE_PROMPT_JOB, jobData, { + attempts: MAX_RETRIES, + backoff: { + type: 'exponential', + delay: 2000, + }, + removeOnComplete: 100, + removeOnFail: 100, + }); + + await updateItemStatus(item.id, PerfectPromptStatus.PENDING); + await queue.close(); +} + +export const runPerfectSentencePromptWorker = () => { + const connection = getRedisConnection(); + const worker = new Worker( + PERFECT_SENTENCE_PROMPT_JOB, + async (job: Job) => { + const { itemId, prompt, data } = job.data; + const perfect = new SentenceImage(); + try { + let content = '' + if (data && data.sentence) { + const { prompt, ...rest } = data.sentence || {}; + if (rest) { + content = JSON.stringify(rest); + } + } + if (!content) { + content = prompt; + } + if (!content) { + notify.notify(`[Sentence] 提示词优化任务跳过,条目 ${itemId}: 无内容可优化。`); + await updateItemStatus(itemId, PerfectPromptStatus.FAILED); + return; + } + + const perfectText = perfect.perfect(content); + const result = await ai.chat([], { + messages: [{ role: 'user', content: perfectText }], + // model: 'qwen-plus', + // enable_thinking: true + model: 'qwen-turbo', + }); + const perfectPrompt = perfect.clearPerfectTags(ai.responseText!) || ''; + console.log(`[Sentence] 提示词优化 ${itemId}:\n`, perfectPrompt); + + await updateItemStatus(itemId, '提示词优化完成',{ + description: perfectPrompt + }); + await sleep(500); // 确保数据已保存 + // 任务完成,把任务抛给下一个图片生成队列 + const item = await pbService.collection.getOne(itemId); + if (item) { + addImageGenerateJob(item) + } + + return { success: true, perfectPrompt: perfectText }; + } catch (error: any) { + notify.notify(`[Sentence] 提示词优化任务失败,条目 ${itemId}: ${error.message}`); + await updateItemStatus(itemId, PerfectPromptStatus.FAILED); + throw error; + } + }, + { + connection, + lockDuration: 60000 * 5, // 锁持续时间 5分钟 + stalledInterval: 30000, // 每30秒检查一次 stalled + } + ); + + worker.on('completed', (job) => { + // notify.notify(`[Sentence] Perfect Prompt Job Completed for item ${job.data.itemId}`); + console.log(`[Sentence] Perfect Prompt Job Completed for item ${job.data.itemId}`); + }); + + worker.on('failed', (job, err) => { + notify.notify(`[Sentence] 提示词优化任务失败,条目${job?.data.itemId}: ${err.message}`); + // 如果是第三次失败,停止worker; + if (job && job.attemptsMade >= MAX_RETRIES - 1) { + worker.close(); + notify.notify(`[Sentence] 提示词优化工作者在条目 ${job.data.itemId} 达到最大重试次数后停止`); + } + }); + + return worker; +}; \ No newline at end of file diff --git a/prompts/src/workers/index.ts b/prompts/src/workers/index.ts index bd6c6b5..dc12660 100644 --- a/prompts/src/workers/index.ts +++ b/prompts/src/workers/index.ts @@ -1,10 +1,12 @@ import { runImageDownloadWorker, runImageGenerateWorker } from '../task/image-creator.job.ts'; +import { runPerfectSentencePromptWorker } from '../task/perfect-sentence.job.ts'; runImageDownloadWorker(); runImageGenerateWorker(); +runPerfectSentencePromptWorker(); // 运行半小时后停止 setTimeout(() => { console.log('Stop timeed', new Date().toISOString()); process.exit(0); -}, 60 * 60 * 1000); // 60 minutes in milliseconds \ No newline at end of file +}, 10 * 60 * 60 * 1000); // 10 hours in milliseconds \ No newline at end of file diff --git a/prompts/test/import-sentence.ts b/prompts/test/import-sentence.ts index 2ed821d..6cfcd54 100644 --- a/prompts/test/import-sentence.ts +++ b/prompts/test/import-sentence.ts @@ -18,7 +18,20 @@ async function run() { data: sentences } }); + console.log('Import sentence result:', res); } -await run(); \ No newline at end of file +// await run(); + +async function run2() { + const sentences = data?.sentences || []; + const res = await app.run({ + path: 'image-creator', + key: 'perfect-sentence', + }); + + console.log('Import sentence result:', res); +} + +await run2(); \ No newline at end of file