import { redis } from '@/modules/redis.ts'; import { Worker } from 'bullmq'; import { add, clamp } from 'lodash-es'; import { nanoid } from 'nanoid'; import { queue, XHS_QUEUE_NAME, taskApp } from './index.ts'; import { addUnreadTask } from './task.ts'; import dayjs from 'dayjs'; import { getTimeDuration } from './utils/time.ts'; import { config, isDev } from '../modules/config.ts'; export const sleep = (ms: number) => { return new Promise((resolve) => setTimeout(resolve, ms)); }; class TimeRecorder { startTime: number; endTime: number; duration: number; updateTime: number; maxDuration: number = 30 * 1000; // 30s; constructor() { const now = Date.now(); this.startTime = now; this.endTime = now; this.updateTime = now; this.duration = 0; } start() { this.startTime = Date.now(); return this.startTime; } end() { this.endTime = Date.now(); this.duration = this.endTime - this.startTime; return this.endTime; } update() { this.updateTime = Date.now(); return this.updateTime; } getClampDuration(random = false) { let randomDuration = 0; if (random) { randomDuration = Math.floor(Math.random() * 5) * 1000; // 随机0-5秒 } const duration = Date.now() - this.updateTime + randomDuration; const nextTime = clamp(this.maxDuration - duration, 0, this.maxDuration); // console.log('getClampDuration', duration, this.maxDuration, 'nextTime', nextTime); return { duration: duration, maxDuration: this.maxDuration, updateTime: this.updateTime, nextTime: nextTime, }; } time() { return { startTime: this.startTime, endTime: this.endTime, duration: this.duration, updateTime: this.updateTime, }; } } const timeRecorder = new TimeRecorder(); let errorCount = 0; export const worker = new Worker( XHS_QUEUE_NAME, async (job) => { const timer = new TimeRecorder(); const data = job.data; if (data.path === 'task' && data.key === 'getUnread') { console.log('====run time start', dayjs().format('YYYY-MM-DD HH:mm:ss')); timeRecorder.update(); } const res = await taskApp.call(data); if (res.code !== 200) { console.log('job error', job.name, job.id, res); errorCount++; if (errorCount > 3) { queue.pause(); console.log('error count', errorCount); if (data.path === 'task' && data.key === 'getUnread') { process.exit(1); } } throw new Error('job error' + job.name + ' ' + job.id); } errorCount = 0; timer.end(); return timer.time(); }, { connection: redis, concurrency: 1, limiter: { max: 1, duration: 2000 } }, ); worker.on('completed', async (job) => { const jobCounts = await queue.getJobCounts('waiting', 'wait', 'delayed'); if (job.name !== 'unread') { console.log('job completed', job.name, 'run id', job.id, job.returnvalue, jobCounts.delayed, jobCounts.wait); } if (jobCounts.delayed + jobCounts.wait > 0) { // console.log('======has jobs, no need to add new job'); } else { const up = timeRecorder.getClampDuration(true); const timeDuration = isDev ? 0 : getTimeDuration(); const nextTime = up.nextTime + timeDuration; const unread = await queue.getJob('unread'); if (!unread) { console.log('====add unread next-time', nextTime, timeDuration); addUnreadTask(nextTime); } } }); const init = async () => { const jobCounts = await queue.getJobCounts('waiting', 'wait', 'delayed'); if (jobCounts.delayed + jobCounts.wait > 0) { // console.log('======has jobs, no need to add new job'); } else { const unread = await queue.getJob('unread'); if (!unread) { addUnreadTask(0); timeRecorder.update(); } } }; init();