This commit is contained in:
2026-01-10 09:14:37 +08:00
parent f7be1abd81
commit e525d68648
10 changed files with 249 additions and 14 deletions

View File

@@ -0,0 +1,31 @@
import { getRedisConnection } from '../src/index.ts';
import { Queue } from 'bullmq'
export async function clearAllJobs() {
const connection = getRedisConnection();
const queueNames = [
'image-download',
'image-generate',
'perfect-prompt',
'perfect-sentence-prompt'
];
for (const queueName of queueNames) {
const queue = new Queue(queueName, { connection });
await queue.drain();
await queue.clean(0, 1000, 'completed');
await queue.clean(0, 1000, 'failed');
console.log(`Cleared all jobs in queue: ${queueName}`);
await queue.close();
}
await connection.quit();
}
clearAllJobs().then(() => {
console.log('All jobs cleared.');
process.exit(0);
}).catch((error) => {
console.error('Error clearing jobs:', error);
process.exit(1);
});

View File

@@ -14,7 +14,9 @@ export const jimengService = useContextKey('jimeng', new JimengService({
baseUrl: config.JIMENG_API_URL,
timeout: parseInt(config.JIMENG_TIMEOUT || '300000'),
}));
export {
getRedisConnection
}
export const ossService = useContextKey('oss', new OSSService({
accessKeyId: config.S3_ACCESS_KEY_ID,
accessKeySecret: config.S3_ACCESS_KEY_SECRET,
@@ -32,4 +34,7 @@ export const app = useContextKey('app', new App());
export const ai = useContextKey('ai', new Kevisual({
apiKey: config.KEVISUAL_NEW_API_KEY,
}));
model: 'qwen-turbo',
}));
export const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

View File

@@ -6,6 +6,7 @@ import './image-update.ts'
import './create-sentence.ts'
import './perfect.ts'
app.route({
path: 'auth',
id: 'auth'

View File

@@ -0,0 +1,46 @@
import { app, ossService, pbService, redis } from '@/app.ts'
import { addPerfectPromptJob } from '@/task/perfect-prompt.job.ts';
import { addPerfectSentencePromptJob } from '@/task/perfect-sentence.job.ts';
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
app.route({
path: 'image-creator',
key: 'perfect-horse',
description: '优化关于像素马的关键字',
middleware: ['auth']
}).define(async (ctx) => {
const id = ctx.query.id as string;
if (!id) {
ctx.throw(400, '缺少 id 参数');
}
const item = await pbService.collection.getOne(id);
if (!item) {
ctx.throw(404, '未找到对应的条目');
}
addPerfectPromptJob(item);
ctx.body = { message: '已添加优化任务到队列' };
}).addTo(app);
app.route({
path: 'image-creator',
key: 'perfect-sentence',
description: '优化关于句子的关键字',
middleware: ['auth']
}).define(async (ctx) => {
const list = await pbService.collection.getFullList({
filter: `status="创建"`,
limit: 1
});
console.log(`Fetched ${list.length} items for sentence perfection.`);
const hasSentences = list.filter(item => {
return item.data && item.data.sentence;
});
for (const item of hasSentences) {
addPerfectSentencePromptJob(item);
await sleep(100); // 避免短时间内添加过多任务
}
console.log(`Added ${list.length} perfect prompt jobs to the queue.`);
ctx.body = { message: '已添加优化任务到队列' };
}).addTo(app);

View File

@@ -135,6 +135,7 @@ export class PBService extends PBCore {
collectionName = 'images_generation_tasks';
constructor(options: PBOptions) {
super(options);
this.client.autoCancellation(false)
}
getCollection<T>(name: string) {
return this.client.collection<T>(name);

View File

@@ -5,7 +5,6 @@ import type { ImageCollection } from '../services/pb.service.ts';
import { updateItemStatus } from '../routes/image-update.ts';
import { notify } from '@/module/logger.ts';
export const IMAGE_CREATOR_JOB = 'image-creator';
export const IMAGE_GENERATE_JOB = 'image-generate';
export const IMAGE_DOWNLOAD_JOB = 'image-download';
@@ -148,6 +147,8 @@ export async function runImageDownloadWorker(): Promise<void> {
{
connection,
concurrency: 3,
lockDuration: 60000 * 5, // 锁持续时间 5分钟
stalledInterval: 30000, // 每30秒检查一次 stalled
} as any
);
@@ -178,7 +179,10 @@ export async function runImageGenerateWorker(): Promise<void> {
try {
// 调用 jimeng API 生成图片
const result = await jimengService.generateImage({ prompt });
const result = await jimengService.generateImage({
prompt,
ratio: '9:16',
});
if (result.code !== 200 || !result.data?.data?.length) {
throw new Error(result.message || 'Failed to generate image');
@@ -197,20 +201,14 @@ export async function runImageGenerateWorker(): Promise<void> {
return { success: true, images };
} catch (error: any) {
console.error(`[ImageGenerate] Error: ${error.message}`);
// 重试次数用尽,暂停任务并停止当前 worker
if (job.attemptsMade >= GENERATE_MAX_RETRIES - 1) {
await updateItemStatus(itemId, ImageTaskStatus.PAUSED);
console.error(`[ImageGenerate] Max retries exceeded. Stopping worker...`);
await worker.close();
}
throw error;
}
},
{
connection,
concurrency: 1, // jimeng API 有节流限制,设置为 1
lockDuration: 60000 * 5, // 锁持续时间 5分钟
stalledInterval: 30000, // 每30秒检查一次 stalled
}
);
@@ -221,6 +219,10 @@ export async function runImageGenerateWorker(): Promise<void> {
worker.on('failed', (job, err) => {
console.error(`[ImageGenerate] Job failed: ${job?.id}, error: ${err.message}`);
notify.notify(`[ImageGenerate] \nJob failed: ${job?.id}, error: ${err.message}\n Job data: ${JSON.stringify(job?.data)}`);
if (job && job.attemptsMade >= GENERATE_MAX_RETRIES - 1) {
worker.close();
notify.notify(`[ImageGenerate] Worker stopped after reaching max retries for item ${job.data.itemId}`);
}
});
console.log('[ImageGenerate] Worker started');

View File

@@ -5,6 +5,7 @@ import type { ImageCollection } from '../services/pb.service.ts';
import { updateItemStatus } from '../routes/image-update.ts';
import { Queue } from 'bullmq';
import { notify } from '@/module/logger.ts';
import { addImageGenerateJob } from './image-creator.job.ts';
export const PERFECT_PROMPT_JOB = 'perfect-prompt';
@@ -22,6 +23,7 @@ export interface PerfectPromptJobData {
itemId: string;
prompt: string;
collectionName?: string;
data?: Record<string, any>;
}
// 优化提示词的模板
@@ -97,6 +99,11 @@ export async function runPerfectPromptWorker(): Promise<void> {
await updateItemStatus(itemId, PerfectPromptStatus.PLANNING, {
description: perfectText,
});
// 任务完成,把任务抛给下一个图片生成队列
const item = await pbService.collection.getOne(itemId);
if (item) {
addImageGenerateJob(item)
}
return { success: true, perfectPrompt: perfectText };
} catch (error: any) {
@@ -113,6 +120,8 @@ export async function runPerfectPromptWorker(): Promise<void> {
{
connection,
concurrency: 2,
lockDuration: 60000 * 5, // 锁持续时间 5分钟
stalledInterval: 30000, // 每30秒检查一次 stalled
}
);
@@ -123,6 +132,10 @@ export async function runPerfectPromptWorker(): Promise<void> {
worker.on('failed', (job, err) => {
console.error(`[PerfectPrompt] Job failed: ${job?.id}, error: ${err.message}`);
notify.notify(`[PerfectPrompt] \nJob failed: ${job?.id}, error: ${err.message}\n Job data: ${JSON.stringify(job?.data)}`);
if (job && job.attemptsMade >= MAX_RETRIES - 1) {
worker.close();
notify.notify(`[PerfectPrompt] Worker stopped after reaching max retries for item ${job.data.itemId}`);
}
});
console.log('[PerfectPrompt] Worker started');

View File

@@ -0,0 +1,121 @@
import { Worker, Job } from 'bullmq';
import { getRedisConnection } from '../module/redis.ts';
import { Prompt, pbService, ai, app } from '../index.ts';
import type { ImageCollection } from '../services/pb.service.ts';
import { updateItemStatus } from '../routes/image-update.ts';
import { Queue } from 'bullmq';
import { notify } from '@/module/logger.ts';
import { addImageGenerateJob } from './image-creator.job.ts';
import { PerfectPromptJobData } from './perfect-prompt.job.ts';
import { SentenceImage } from '@/module/sentence-image.ts';
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
export const PERFECT_SENTENCE_PROMPT_JOB = 'perfect-sentence-prompt';
// 最大重试次数
const MAX_RETRIES = 3;
// 状态常量
export const PerfectPromptStatus = {
PENDING: '提示词优化中' as const,
PLANNING: '计划中' as const,
FAILED: '失败' as const,
};
export async function addPerfectSentencePromptJob(item: ImageCollection): Promise<void> {
const connection = getRedisConnection();
const queue = new Queue(PERFECT_SENTENCE_PROMPT_JOB, { connection });
const jobData: PerfectPromptJobData = {
itemId: item.id,
prompt: item.description || item.summary || item.title || '',
collectionName: pbService.collectionName,
data: item.data
};
await queue.add(PERFECT_SENTENCE_PROMPT_JOB, jobData, {
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 100,
removeOnFail: 100,
});
await updateItemStatus(item.id, PerfectPromptStatus.PENDING);
await queue.close();
}
export const runPerfectSentencePromptWorker = () => {
const connection = getRedisConnection();
const worker = new Worker<PerfectPromptJobData>(
PERFECT_SENTENCE_PROMPT_JOB,
async (job: Job<PerfectPromptJobData>) => {
const { itemId, prompt, data } = job.data;
const perfect = new SentenceImage();
try {
let content = ''
if (data && data.sentence) {
const { prompt, ...rest } = data.sentence || {};
if (rest) {
content = JSON.stringify(rest);
}
}
if (!content) {
content = prompt;
}
if (!content) {
notify.notify(`[Sentence] 提示词优化任务跳过,条目 ${itemId}: 无内容可优化。`);
await updateItemStatus(itemId, PerfectPromptStatus.FAILED);
return;
}
const perfectText = perfect.perfect(content);
const result = await ai.chat([], {
messages: [{ role: 'user', content: perfectText }],
// model: 'qwen-plus',
// enable_thinking: true
model: 'qwen-turbo',
});
const perfectPrompt = perfect.clearPerfectTags(ai.responseText!) || '';
console.log(`[Sentence] 提示词优化 ${itemId}:\n`, perfectPrompt);
await updateItemStatus(itemId, '提示词优化完成',{
description: perfectPrompt
});
await sleep(500); // 确保数据已保存
// 任务完成,把任务抛给下一个图片生成队列
const item = await pbService.collection.getOne(itemId);
if (item) {
addImageGenerateJob(item)
}
return { success: true, perfectPrompt: perfectText };
} catch (error: any) {
notify.notify(`[Sentence] 提示词优化任务失败,条目 ${itemId}: ${error.message}`);
await updateItemStatus(itemId, PerfectPromptStatus.FAILED);
throw error;
}
},
{
connection,
lockDuration: 60000 * 5, // 锁持续时间 5分钟
stalledInterval: 30000, // 每30秒检查一次 stalled
}
);
worker.on('completed', (job) => {
// notify.notify(`[Sentence] Perfect Prompt Job Completed for item ${job.data.itemId}`);
console.log(`[Sentence] Perfect Prompt Job Completed for item ${job.data.itemId}`);
});
worker.on('failed', (job, err) => {
notify.notify(`[Sentence] 提示词优化任务失败,条目${job?.data.itemId}: ${err.message}`);
// 如果是第三次失败停止worker;
if (job && job.attemptsMade >= MAX_RETRIES - 1) {
worker.close();
notify.notify(`[Sentence] 提示词优化工作者在条目 ${job.data.itemId} 达到最大重试次数后停止`);
}
});
return worker;
};

View File

@@ -1,10 +1,12 @@
import { runImageDownloadWorker, runImageGenerateWorker } from '../task/image-creator.job.ts';
import { runPerfectSentencePromptWorker } from '../task/perfect-sentence.job.ts';
runImageDownloadWorker();
runImageGenerateWorker();
runPerfectSentencePromptWorker();
// 运行半小时后停止
setTimeout(() => {
console.log('Stop timeed', new Date().toISOString());
process.exit(0);
}, 60 * 60 * 1000); // 60 minutes in milliseconds
}, 10 * 60 * 60 * 1000); // 10 hours in milliseconds

View File

@@ -18,7 +18,20 @@ async function run() {
data: sentences
}
});
console.log('Import sentence result:', res);
}
await run();
// await run();
async function run2() {
const sentences = data?.sentences || [];
const res = await app.run({
path: 'image-creator',
key: 'perfect-sentence',
});
console.log('Import sentence result:', res);
}
await run2();