feat: Add Jimeng image generation service and related functionality

- Implemented JimengService for image generation with API integration.
- Created OSSService for handling image uploads to S3.
- Developed PBService for managing PocketBase interactions.
- Added task management for image creation and downloading using BullMQ.
- Introduced routes for creating image generation tasks.
- Implemented logging and error handling for image processing.
- Added configuration management for Redis and other services.
- Created scripts for testing image generation and PocketBase integration.
- Updated package dependencies and added new scripts for worker management.
This commit is contained in:
2026-01-09 02:55:04 +08:00
parent 7cca41b457
commit 9da3d14752
31 changed files with 3193 additions and 5 deletions

35
prompts/src/app.ts Normal file
View File

@@ -0,0 +1,35 @@
import { JimengService } from './services/jimeng.service.ts';
import { OSSService } from './services/oss.service.ts';
import { PBService } from './services/pb.service.ts';
import { useConfig } from '@kevisual/use-config';
import { App } from '@kevisual/router'
import { useContextKey } from '@kevisual/context';
import { getRedisConnection } from './module/redis.ts';
import { Kevisual } from '@kevisual/ai';
export const config = useConfig();
export const redis = useContextKey('redis', () => getRedisConnection());
export const jimengService = useContextKey('jimeng', new JimengService({
apiKey: config.JIMENG_API_KEY,
baseUrl: config.JIMENG_API_URL,
timeout: parseInt(config.JIMENG_TIMEOUT || '300000'),
}));
export const ossService = useContextKey('oss', new OSSService({
accessKeyId: config.S3_ACCESS_KEY_ID,
accessKeySecret: config.S3_ACCESS_KEY_SECRET,
bucketName: config.S3_BUCKET_NAME,
region: config.S3_REGION,
endpoint: config.S3_ENDPOINT,
prefix: 'projects/horse/',
}));
export const pbService = useContextKey('pb', new PBService({
url: config.POCKETBASE_URL,
token: config.POCKETBASE_TOKEN,
}));
export const app = useContextKey('app', new App());
export const ai = useContextKey('ai', new Kevisual({
apiKey: config.KEVISUAL_NEW_API_KEY,
}));

View File

@@ -11,4 +11,10 @@ async function saveToFile(data: Map<string, string>, outputPath: string): Promis
console.log(`Generated ${arrayData.length} prompts and saved to ${outputPath}`);
}
import './routes/index.ts';
export * from './app.ts';
// list all routes and import
export { PromptGenerator, PromptGeneratorOptions, saveToFile, Prompt };

View File

@@ -0,0 +1,15 @@
import { useConfig } from '@kevisual/use-config';
export const config = useConfig();
export const queueConfig = {
name: 'image-generation-queue',
concurrency: parseInt(config.QUEUE_CONCURRENCY || '1'),
maxFailed: parseInt(config.QUEUE_MAX_FAILED || '2'),
};
export const redisConfig = {
host: config.REDIS_HOST || 'localhost',
port: parseInt(config.REDIS_PORT || '6379'),
password: config.REDIS_PASSWORD || undefined,
db: parseInt(config.REDIS_DB || '0'),
};

View File

@@ -0,0 +1,10 @@
import { Logger } from "@kevisual/logger";
import { config } from '@/app.ts'
import { FeishuNotifier } from "@kevisual/notifier";
export const logger = new Logger({
level: config.LOG_LEVEL || 'info',
});
export const feishuNotifier = new FeishuNotifier({
webhook: config.FEISHU_NOTIFY_WEBHOOK_URL || '',
});

View File

@@ -0,0 +1,29 @@
import { Redis } from 'ioredis';
import { redisConfig } from './config.ts'
export interface RedisConfig {
host: string;
port: number;
password?: string;
db: number;
}
let redisConnection: Redis | null = null;
export const getRedisConnection = () => {
if (!redisConnection) {
redisConnection = new Redis({
...redisConfig,
maxRetriesPerRequest: null
});
redisConnection.on('connect', () => {
console.log('Redis connected');
});
redisConnection.on('error', (err) => {
console.error('Redis connection error:', err);
});
}
return redisConnection as any;
};

View File

@@ -0,0 +1,15 @@
import { app, ossService, pbService, redis } from '@/app.ts'
import { addImageGenerateJob } from '@/task/image-creator.job.ts';
app.route({
path: 'image-creator',
key: 'create-task',
}).define(async (ctx) => {
const list = await pbService.collection.getFullList({
filter: 'status="计划中"',
})
for (const item of list) {
await addImageGenerateJob(item);
}
console.log(`Added ${list.length} image generate jobs to the queue.`);
}).addTo(app);

View File

@@ -0,0 +1 @@
import './create-task.ts'

View File

@@ -0,0 +1,121 @@
import { Result } from '@kevisual/query'
export interface JimengOptions {
/** API密钥用于认证请求 */
apiKey: string;
/** API基础URL */
baseUrl: string;
/** 请求超时时间(毫秒) */
timeout: number;
}
export interface JimengGenerateOptions {
/** 图片生成提示词 */
prompt: string;
/** 使用的模型版本,默认 jimeng-4.0 */
model?: string;
/** 图片比例,默认 1:1 */
ratio?: string;
/** 图片分辨率,默认 2k */
resolution?: string;
}
interface JimengResponse {
/** 请求创建时间戳 */
created: number;
/** 生成的图片列表 */
data: Array<{
/** 图片URL */
url: string;
}>;
}
export class JimengService {
private apiKey: string;
private baseUrl: string;
private timeout: number;
constructor(options: JimengOptions) {
this.apiKey = options.apiKey;
this.baseUrl = options.baseUrl || 'https://jimeng-api.kevisual.cn/v1';
this.timeout = options.timeout;
}
async generateImage(options: JimengGenerateOptions): Promise<Result<JimengResponse>> {
const {
prompt,
model = 'jimeng-4.6',
ratio = '1:1',
resolution = '2k'
} = options;
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
const response = await fetch(`${this.baseUrl}/images/generations`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
},
body: JSON.stringify({
model,
prompt,
ratio,
resolution,
}),
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`jimeng API error: ${response.status} ${response.statusText}`);
}
const result = await response.json() as JimengResponse;
return { code: 200, data: result };
} catch (error: any) {
return { code: 500, message: error.message || 'Unknown error' };
}
}
async downloadImage(url: string): Promise<Buffer> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
try {
const response = await fetch(url, {
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`Failed to download image: ${response.statusText}`);
}
const arrayBuffer = await response.arrayBuffer();
return Buffer.from(arrayBuffer);
} catch (error: any) {
clearTimeout(timeoutId);
if (error.name === 'AbortError') {
throw new Error('Image download timeout');
}
throw error;
}
}
/** 获取图片过期时间 */
async getExpiredTime(url: string): Promise<{ expiredAt: number, expired: boolean }> {
// https://p3-dreamina-sign.byteimg.com/tos-cn-i-tb4s082cfz/c018e06ee6654dd78ccacb29eff4744e~tplv-tb4s082cfz-aigc_resize:0:0.png?lk3s=43402efa&x-expires=1767852000&x-signature=34yf37N955BP37eLaYEzKeLQn0Q%3D&format=.png
const urlObj = new URL(url);
let expires = urlObj.searchParams.get('x-expires');
if (!expires) {
expires = '0';
}
const expiredAt = parseInt(expires) * 1000;
const expired = Date.now() > expiredAt;
return { expiredAt, expired };
}
}

View File

@@ -0,0 +1,36 @@
import { OssBase } from '@kevisual/oss/s3.ts';
import { S3Client } from '@aws-sdk/client-s3'
export type OSSOptions = {
accessKeyId: string;
accessKeySecret: string;
region: string;
bucketName: string;
endpoint: string;
prefix?: string;
}
export class OSSService extends OssBase {
declare client: S3Client;
endpoint: string;
constructor(options: OSSOptions) {
const client = new S3Client({
region: options.region,
endpoint: `${options.endpoint}`,
credentials: {
accessKeyId: options.accessKeyId,
secretAccessKey: options.accessKeySecret,
},
});
super({
client,
bucketName: options.bucketName,
prefix: options.prefix || '',
});
this.endpoint = options.endpoint;
}
getLink(objectName: string): string {
const endpoint = this.endpoint;
return `${endpoint}/${this.bucketName}/${this.prefix}${objectName}`;
}
}

View File

@@ -0,0 +1,67 @@
import PocketBase from 'pocketbase';
import { EventEmitter } from 'eventemitter3'
type PBOptions = {
url: string;
token?: string;
}
export class PBCore {
declare client: PocketBase;
emitter = new EventEmitter();
token?: string;
constructor(options: PBOptions) {
this.client = new PocketBase(options.url);
this.token = options.token || '';
if (this.token) {
this.client.authStore.save(this.token);
}
}
async loginAdmin(email: string, password: string) {
const authData = await this.client.collection("_superusers").authWithPassword(email, password);
this.emitter.emit('login', authData);
console.log('PocketBase admin logged in:', authData);
return authData;
}
}
export class PBService extends PBCore {
collectionName = 'images_generation_tasks';
constructor(options: PBOptions) {
super(options);
}
getCollection<T>(name: string) {
return this.client.collection<T>(name);
}
async initPbService() {
const isLogin = this.client.authStore.isValid;
console.log('PocketBase is logged in:', isLogin);
}
async importData(data: any[]) {
const collection = this.getCollection(this.collectionName);
for (const item of data) {
await collection.create(item);
}
}
get collection() {
return this.client.collection<ImageCollection>(this.collectionName);
}
}
const ImageTaskStatus = ['提示词优化中', '计划中', '生成图片中', '图片下载中', '暂停中', '已完成', '失败'] as const;
type Data = {
images: { type: 'jimeng' | 'tos', url: string }[];
}
export type ImageCollection = {
id: string;
created: string;
updated: string;
title: string;
tags: any;
summary: string;
description: string;
data: Data;
status: typeof ImageTaskStatus[number];
}

View File

@@ -0,0 +1,58 @@
import { createStorage } from 'unstorage';
import fsDriver from 'unstorage/drivers/fs';
export interface PromptData {
value: string;
id: string;
perfect: string;
imageUrl?: string;
generatedAt?: number;
}
export class StorageService {
private storage: ReturnType<typeof createStorage>;
constructor() {
this.storage = createStorage({
driver: fsDriver({ base: 'storage' }),
});
}
async get(id: string): Promise<PromptData | null> {
const filename = id.endsWith('.json') ? id : `${id}.json`;
const data = await this.storage.getItem<PromptData>(filename);
return data || null;
}
async getPendingPrompts(): Promise<PromptData[]> {
const keys = await this.storage.getKeys();
const pending: PromptData[] = [];
for (const key of keys) {
if (key === 'usage.json') continue;
const data = await this.storage.getItem<PromptData>(key);
if (data && !data.imageUrl) {
pending.push(data);
}
}
return pending;
}
async update(id: string, data: Partial<PromptData>): Promise<void> {
const filename = id.endsWith('.json') ? id : `${id}.json`;
const existing = await this.storage.getItem<PromptData>(filename);
if (existing) {
await this.storage.setItem(filename, { ...existing, ...data });
}
}
async hasImage(id: string): Promise<boolean> {
const data = await this.get(id);
return !!data?.imageUrl;
}
}
export const storageService = new StorageService();

View File

@@ -0,0 +1,253 @@
import { Worker, Queue, Job } from 'bullmq';
import { getRedisConnection } from '../module/redis.ts';
import { pbService, jimengService, ossService } from '../index.ts';
import type { ImageCollection } from '../services/pb.service.ts';
export const IMAGE_CREATOR_JOB = 'image-creator';
export const IMAGE_GENERATE_JOB = 'image-generate';
export const IMAGE_DOWNLOAD_JOB = 'image-download';
// 状态常量
export const ImageTaskStatus = {
PENDING: '提示词优化中' as const,
PLANNING: '计划中' as const,
GENERATING: '生成图片中' as const,
DOWNLOADING: '图片下载中' as const,
PAUSED: '暂停中' as const,
COMPLETED: '已完成' as const,
FAILED: '失败' as const,
};
// 生成图片任务的节流时间(毫秒)
const JIMENG_THROTTLE_DELAY = 60 * 1000;
// 下载任务最大重试次数
const DOWNLOAD_MAX_RETRIES = 3;
// 图片生成任务最大重试次数
const GENERATE_MAX_RETRIES = 3;
export interface ImageCreatorJobData {
itemId: string;
prompt: string;
collectionName?: string;
}
export interface ImageGenerateJobData {
itemId: string;
prompt: string;
collectionName?: string;
}
export interface ImageDownloadJobData {
itemId: string;
imageUrl: string;
collectionName?: string;
index: number;
}
// 更新 PB 状态
async function updateItemStatus(
itemId: string,
status: string,
extraData?: Partial<ImageCollection>
): Promise<void> {
const collection = pbService.getCollection<ImageCollection>(pbService.collectionName);
if (extraData) {
const existingItem = await pbService.collection.getOne(itemId);
const data = existingItem.data;
const existingImages = data?.images || [];
const newImages = extraData.data?.images || [];
await collection.update(itemId, {
status,
...extraData,
data: {
...extraData?.data,
...data,
images: [...existingImages, ...newImages],
},
});
} else {
await collection.update(itemId, {
status,
});
}
}
/**
* 单独添加生成图片任务
*/
export async function addImageGenerateJob(item: ImageCollection): Promise<void> {
const connection = getRedisConnection();
const queue = new Queue(IMAGE_GENERATE_JOB, { connection });
const jobData: ImageGenerateJobData = {
itemId: item.id,
prompt: item.description || item.summary || item.title,
collectionName: pbService.collectionName,
};
await queue.add(IMAGE_GENERATE_JOB, jobData, {
removeOnComplete: 100,
removeOnFail: 100,
delay: JIMENG_THROTTLE_DELAY, // 任务间隔 30 秒
});
await updateItemStatus(item.id, ImageTaskStatus.GENERATING);
// console.log(`[ImageGenerate] Job created for item: ${item.id}`);
await queue.close();
}
/**
* 单独添加下载图片任务
*/
export async function addImageDownloadJob(
itemId: string,
imageUrl: string,
index?: number
): Promise<void> {
const connection = getRedisConnection();
const queue = new Queue(IMAGE_DOWNLOAD_JOB, { connection });
const jobData: ImageDownloadJobData = {
itemId,
imageUrl,
collectionName: pbService.collectionName,
index: index ?? 0
};
// 使用 bullmq 内置重试,指数退避
await queue.add(IMAGE_DOWNLOAD_JOB, jobData, {
attempts: DOWNLOAD_MAX_RETRIES,
backoff: {
type: 'exponential',
delay: 2000, // 初始 2 秒
},
removeOnComplete: 100,
removeOnFail: 100,
});
await updateItemStatus(itemId, ImageTaskStatus.DOWNLOADING);
// console.log(`[ImageDownload] Job created for item: ${itemId}`);
await queue.close();
}
/**
* 运行独立的下载 worker
*/
export async function runImageDownloadWorker(): Promise<void> {
const connection = getRedisConnection();
const worker = new Worker(
IMAGE_DOWNLOAD_JOB,
async (job: Job<ImageDownloadJobData>) => {
const { itemId, imageUrl, index } = job.data;
const attemptsMade = job.attemptsMade;
console.log(`[ImageDownload] Processing item: ${itemId}, attempt: ${attemptsMade + 1}/${DOWNLOAD_MAX_RETRIES}`);
try {
const imageBuffer = await jimengService.downloadImage(imageUrl);
const filename = `generated_${itemId}_${index}_${Date.now()}.png`;
await ossService.putObject(filename, imageBuffer);
const ossUrl = ossService.getLink(filename)
console.log(`[ImageDownload] Image uploaded to OSS: ${ossUrl}`);
const imageData = { type: 'tos' as const, url: ossUrl };
await updateItemStatus(itemId, ImageTaskStatus.COMPLETED, {
data: {
images: [imageData],
},
});
return { success: true, ossUrl };
} catch (error: any) {
console.error(`[ImageDownload] Error: ${error.message}`);
// 重试次数用尽,暂停任务
if (job.attemptsMade >= DOWNLOAD_MAX_RETRIES - 1) {
await updateItemStatus(itemId, ImageTaskStatus.PAUSED);
}
throw error;
}
},
{
connection,
concurrency: 3,
} as any
);
worker.on('completed', (job) => {
console.log(`[ImageDownload] Job completed: ${job.id}`);
});
worker.on('failed', (job, err) => {
console.error(`[ImageDownload] Job failed: ${job?.id}, error: ${err.message}`);
});
console.log('[ImageDownload] Worker started');
}
/**
* 运行图片生成 worker使用 jimeng API
*/
export async function runImageGenerateWorker(): Promise<void> {
const connection = getRedisConnection();
const worker = new Worker(
IMAGE_GENERATE_JOB,
async (job: Job<ImageGenerateJobData>) => {
const { itemId, prompt } = job.data;
const attemptsMade = job.attemptsMade;
console.log(`[ImageGenerate] Processing item: ${itemId}, attempt: ${attemptsMade + 1}/${GENERATE_MAX_RETRIES}`);
try {
// 调用 jimeng API 生成图片
const result = await jimengService.generateImage({ prompt });
if (result.code !== 200 || !result.data?.data?.length) {
throw new Error(result.message || 'Failed to generate image');
}
const images = result.data.data;
for (const [index, img] of images.entries()) {
console.log(`[ImageGenerate] Image generated: ${img.url}`);
// 生成成功后,添加下载任务
await addImageDownloadJob(itemId, img.url, index);
}
// 更新状态为下载中
await updateItemStatus(itemId, ImageTaskStatus.DOWNLOADING, {
data: { images: images.map(img => ({ type: 'jimeng' as const, url: img.url })) },
});
return { success: true, images };
} catch (error: any) {
console.error(`[ImageGenerate] Error: ${error.message}`);
// 重试次数用尽,暂停任务并停止当前 worker
if (job.attemptsMade >= GENERATE_MAX_RETRIES - 1) {
await updateItemStatus(itemId, ImageTaskStatus.PAUSED);
console.error(`[ImageGenerate] Max retries exceeded. Stopping worker...`);
await worker.close();
}
throw error;
}
},
{
connection,
concurrency: 1, // jimeng API 有节流限制,设置为 1
}
);
worker.on('completed', (job) => {
console.log(`[ImageGenerate] Job completed: ${job.id}`);
});
worker.on('failed', (job, err) => {
console.error(`[ImageGenerate] Job failed: ${job?.id}, error: ${err.message}`);
});
console.log('[ImageGenerate] Worker started');
}
export { updateItemStatus };

View File

@@ -0,0 +1,153 @@
import { Worker, Job } from 'bullmq';
import { getRedisConnection } from '../module/redis.ts';
import { Prompt, pbService, ai } from '../index.ts';
import type { ImageCollection } from '../services/pb.service.ts';
// 重新导出 Queue因为需要在 addPerfectPromptJob 中使用
import { Queue } from 'bullmq';
export const PERFECT_PROMPT_JOB = 'perfect-prompt';
// 状态常量
export const PerfectPromptStatus = {
PENDING: '提示词优化中' as const,
COMPLETED: '已完成' as const,
FAILED: '失败' as const,
};
// 最大重试次数
const MAX_RETRIES = 3;
export interface PerfectPromptJobData {
itemId: string;
prompt: string;
collectionName?: string;
}
// 优化提示词的模板
const DEFAULT_PERFECT_PROMPT = `请你将以下提示词进行完善,使其更加详细和具体,适合用于生成高质量的像素艺术图像。要求如下:
1. 只返回完善后的提示词,不要包含任何多余的内容或解释。
2. 确保提示词专注于像素艺术风格,包括但不限于像素化角色、场景和物体的描述。
3. 使用具体的细节来增强提示词的表现力,例如颜色、构图、光影效果等。
4. 避免使用与像素艺术无关的术语或描述。
5. 保持提示词的简洁性,避免过于冗长,但要确保信息量充足。
6. 如果需要颜色,需要整个图像的颜色更少的描述,而不是复杂的颜色细节, 背景默认纯蓝色。
7. 使用中文进行描述。
`;
// 更新 PB 状态
async function updateItemStatus(
itemId: string,
status: string,
extraData?: Partial<ImageCollection>
): Promise<void> {
const collection = pbService.getCollection<ImageCollection>(pbService.collectionName);
if (extraData) {
const existingItem = await pbService.collection.getOne(itemId);
const data = existingItem.data;
await collection.update(itemId, {
status,
...extraData,
data: {
...extraData?.data,
...data,
},
});
} else {
await collection.update(itemId, {
status,
});
}
}
/**
* 单独添加优化提示词任务
*/
export async function addPerfectPromptJob(item: ImageCollection): Promise<void> {
const connection = getRedisConnection();
const queue = new Queue(PERFECT_PROMPT_JOB, { connection });
const jobData: PerfectPromptJobData = {
itemId: item.id,
prompt: item.description || item.summary || item.title || '',
collectionName: pbService.collectionName,
};
await queue.add(PERFECT_PROMPT_JOB, jobData, {
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 100,
removeOnFail: 100,
});
await updateItemStatus(item.id, PerfectPromptStatus.PENDING);
await queue.close();
}
/**
* 运行优化提示词 worker
*/
export async function runPerfectPromptWorker(): Promise<void> {
const connection = getRedisConnection();
// 获取环境变量中的 API key
const worker = new Worker(
PERFECT_PROMPT_JOB,
async (job: Job<PerfectPromptJobData>) => {
const { itemId, prompt } = job.data;
const attemptsMade = job.attemptsMade;
console.log(`[PerfectPrompt] Processing item: ${itemId}, attempt: ${attemptsMade + 1}/${MAX_RETRIES}`);
try {
if (!prompt) {
throw new Error('Prompt is empty');
}
const promptTool = new Prompt({ perfectPrompt: DEFAULT_PERFECT_PROMPT });
await ai.chat([
{
role: 'user',
content: promptTool.perfect(prompt),
},
]);
const perfectText = promptTool.clearPerfectTags(ai.responseText);
if (!perfectText) {
throw new Error('Generated perfect prompt is empty');
}
console.log(`[PerfectPrompt] Perfect prompt generated for item: ${itemId}`);
// 更新状态为已完成,并保存优化后的提示词
await updateItemStatus(itemId, PerfectPromptStatus.COMPLETED, {
description: perfectText,
});
return { success: true, perfectPrompt: perfectText };
} catch (error: any) {
console.error(`[PerfectPrompt] Error: ${error.message}`);
// 重试次数用尽,标记为失败
if (job.attemptsMade >= MAX_RETRIES - 1) {
await updateItemStatus(itemId, PerfectPromptStatus.FAILED);
}
throw error;
}
},
{
connection,
concurrency: 2,
}
);
worker.on('completed', (job) => {
console.log(`[PerfectPrompt] Job completed: ${job.id}`);
});
worker.on('failed', (job, err) => {
console.error(`[PerfectPrompt] Job failed: ${job?.id}, error: ${err.message}`);
});
console.log('[PerfectPrompt] Worker started');
}

View File

@@ -0,0 +1,10 @@
import { runImageDownloadWorker, runImageGenerateWorker } from '../task/image-creator.job.ts';
runImageDownloadWorker();
runImageGenerateWorker();
// 运行半小时后停止
setTimeout(() => {
console.log('Stop timeed', new Date().toISOString());
process.exit(0);
}, 60 * 60 * 1000); // 60 minutes in milliseconds