This commit is contained in:
2025-05-05 00:01:36 +08:00
parent d6014b3c40
commit a412c09da0
20 changed files with 2911 additions and 102 deletions

View File

@@ -1,52 +1,112 @@
import { redis } from '@/modules/redis.ts';
import { Queue, Worker } from 'bullmq';
import { clamp } from 'lodash-es';
import { Worker } from 'bullmq';
import { add, clamp } from 'lodash-es';
import { nanoid } from 'nanoid';
const XHS_QUEUE_NAME = 'XHS_QUEUE';
export const queue = new Queue(XHS_QUEUE_NAME);
import { queue, XHS_QUEUE_NAME, taskApp } from './index.ts';
import { addUnreadTask } from './task.ts';
import dayjs from 'dayjs';
export const sleep = (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
class TimeRecorder {
startTime: number;
endTime: number;
duration: number;
updateTime: number;
maxDuration: number = 60 * 1000; // 20s;
constructor() {
const now = Date.now();
this.startTime = now;
this.endTime = now;
this.updateTime = now;
this.duration = 0;
}
start() {
this.startTime = Date.now();
return this.startTime;
}
end() {
this.endTime = Date.now();
this.duration = this.endTime - this.startTime;
return this.endTime;
}
update() {
this.updateTime = Date.now();
return this.updateTime;
}
getClampDuration() {
const duration = Date.now() - this.updateTime;
return {
duration: duration,
maxDuration: this.maxDuration,
updateTime: this.updateTime,
nextTime: clamp(this.maxDuration - duration, 0, this.maxDuration),
};
}
time() {
return {
startTime: this.startTime,
endTime: this.endTime,
duration: this.duration,
updateTime: this.updateTime,
};
}
}
const timeRecorder = new TimeRecorder();
let errorCount = 0;
export const worker = new Worker(
XHS_QUEUE_NAME,
async (job) => {
const startTime = Date.now();
console.log('job', job.name, job.data);
await sleep(1000);
const endTime = Date.now();
const duration = endTime - startTime;
const timer = new TimeRecorder();
const data = job.data;
if (data.path === 'task' && data.key === 'getUnread') {
console.log('====run time', dayjs().format('YYYY-MM-DD HH:mm:ss'));
timeRecorder.update();
}
const res = await taskApp.call(data);
if (res.code !== 200) {
console.log('job error', job.name, job.id, res);
return {
startTime: startTime,
endTime: endTime,
duration: duration,
};
errorCount++;
if (errorCount > 3) {
queue.pause();
console.log('error count', errorCount);
}
throw new Error('job error' + job.name + ' ' + job.id);
}
errorCount = 0;
timer.end();
return timer.time();
},
{ connection: redis, concurrency: 1 },
{ connection: redis, concurrency: 1, limiter: { max: 1, duration: 2000 } },
);
worker.on('completed', async (job) => {
console.log('job completed', job.name, job.id, job.returnvalue);
const duration = job.returnvalue.duration || 0;
const maxNextTime = 20 * 1000; // 5 minutes
const nextTime = clamp(maxNextTime - duration, 0, maxNextTime);
const hasJobs = await queue.getJobCounts('waiting', 'wait', 'delayed');
console.log('hasJobs', nextTime, 'joblenght', hasJobs);
if (hasJobs.delayed + hasJobs.wait > 0) {
console.log('======has jobs, no need to add new job');
const jobCounts = await queue.getJobCounts('waiting', 'wait', 'delayed');
if (job.name !== 'unread') {
console.log('job completed', job.name, job.id, job.returnvalue, jobCounts.delayed, jobCounts.wait);
}
if (jobCounts.delayed + jobCounts.wait > 0) {
// console.log('======has jobs, no need to add new job');
} else {
const id = nanoid();
queue.add(
'repeact-call-job' + id,
{},
{
delay: nextTime,
removeOnComplete: true,
removeOnFail: {
age: 24 * 3600, // keep up to 24 hours
},
jobId: id,
},
);
const up = timeRecorder.getClampDuration();
const nextTime = up.nextTime;
const unread = await queue.getJob('unread');
if (!unread) {
addUnreadTask(nextTime);
}
}
});
const init = async () => {
const jobCounts = await queue.getJobCounts('waiting', 'wait', 'delayed');
if (jobCounts.delayed + jobCounts.wait > 0) {
// console.log('======has jobs, no need to add new job');
} else {
const unread = await queue.getJob('unread');
if (!unread) {
addUnreadTask(0);
timeRecorder.update();
}
}
};
init();