diff --git a/package.json b/package.json index 84225b2..129a0f9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@kevisual/file-listener", - "version": "0.0.1", + "version": "0.0.2", "description": "", "main": "index.js", "basename": "/root/file-listener", @@ -43,6 +43,7 @@ "@kevisual/logger": "^0.0.4", "@kevisual/oss": "^0.0.12", "@kevisual/storage": "^0.0.4", + "@kevisual/task": "^0.0.1", "@kevisual/types": "^0.0.10", "@kevisual/use-config": "^1.0.18", "@types/bun": "^1.2.15", @@ -50,6 +51,7 @@ "@types/formidable": "^3.4.5", "@types/lodash-es": "^4.17.12", "@types/node": "^22.15.30", + "bullmq": "^5.53.2", "commander": "^14.0.0", "concurrently": "^9.1.2", "cross-env": "^7.0.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7258d6f..359a2d6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -39,6 +39,9 @@ importers: '@kevisual/storage': specifier: ^0.0.4 version: 0.0.4 + '@kevisual/task': + specifier: ^0.0.1 + version: 0.0.1 '@kevisual/types': specifier: ^0.0.10 version: 0.0.10 @@ -57,6 +60,9 @@ importers: '@types/node': specifier: ^22.15.30 version: 22.15.30 + bullmq: + specifier: ^5.53.2 + version: 5.53.2 commander: specifier: ^14.0.0 version: 14.0.0 @@ -130,6 +136,9 @@ packages: '@kevisual/storage@0.0.4': resolution: {integrity: sha512-Pbk89sKHo6xk/bMja5RWOLi3W+xYnrMTQ1fURlC1SlPjGg88/FwAP8fJ8KBlpV1AOskoCj/cCB1pQ9AcB3Vjjw==} + '@kevisual/task@0.0.1': + resolution: {integrity: sha512-LyCxxFVQx+JpO53srtkcpJn5nD/l8NE/DtJv+FQ/iVXozWbnlP+p0ueIOoisDvRbUYBe7uShF6C5Icdq6Tqz1w==} + '@kevisual/types@0.0.10': resolution: {integrity: sha512-Q73uzzjk9UidumnmCvOpgzqDDvQxsblz22bIFuoiioUFJWwaparx8bpd8ArRyFojicYL1YJoFDzDZ9j9NN8grA==} @@ -146,6 +155,36 @@ packages: resolution: {integrity: sha512-ajBvlKpWucBB17FuQYUShqpqy8GRgYEpJW0vWJbUu1CV9lWyrDCapy0lScU8T8Z6qn49sSwJB3+M+evYIdGg+A==} engines: {node: '>= 0.4'} + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==} + cpu: [arm64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==} + cpu: [x64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==} + cpu: [arm64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==} + cpu: [arm] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==} + cpu: [x64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==} + cpu: [x64] + os: [win32] + '@noble/hashes@1.8.0': resolution: {integrity: sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==} engines: {node: ^14.21.3 || >=16} @@ -273,6 +312,9 @@ packages: resolution: {integrity: sha512-Db1SbgBS/fg/392AblrMJk97KggmvYhr4pB5ZIMTWtaivCPMWLkmb7m21cJvpvgK+J3nsU2CmmixNBZx4vFj/w==} engines: {node: '>=8.0.0'} + bullmq@5.53.2: + resolution: {integrity: sha512-xHgxrP/yNJHD7VCw1h+eRBh+2TCPBCM39uC9gCyksYc6ufcJP+HTZ/A2lzB2x7qMFWrvsX7tM40AT2BmdkYL/Q==} + bun-types@1.2.15: resolution: {integrity: sha512-NarRIaS+iOaQU1JPfyKhZm4AsUOrwUOqRNHY0XxI8GI8jYxiLXLcdjYMG9UKS+fwWasc1uw1htV9AX24dD+p4w==} @@ -335,6 +377,10 @@ packages: resolution: {integrity: sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==} engines: {node: '>= 0.10'} + cron-parser@4.9.0: + resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} + engines: {node: '>=12.0.0'} + cross-env@7.0.3: resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==} engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'} @@ -400,6 +446,10 @@ packages: resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} engines: {node: '>=0.10'} + detect-libc@2.0.4: + resolution: {integrity: sha512-3UDv+G9CsCKO1WKMGw9fwq/SWJYbI0c5Y7LU1AXYoDdbhE2AHQ6N6Nb34sG8Fj7T5APy8qXDCKuuIHd1BR0tVA==} + engines: {node: '>=8'} + dezalgo@1.0.4: resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==} @@ -755,6 +805,10 @@ packages: resolution: {integrity: sha512-QIXZUBJUx+2zHUdQujWejBkcD9+cs94tLn0+YL8UrCh+D5sCXZ4c7LaEH48pNwRY3MLDgqUFyhlCyjJPf1WP0A==} engines: {node: 20 || >=22} + luxon@3.6.1: + resolution: {integrity: sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==} + engines: {node: '>=12'} + math-intrinsics@1.1.0: resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==} engines: {node: '>= 0.4'} @@ -798,6 +852,13 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + msgpackr-extract@3.0.3: + resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==} + hasBin: true + + msgpackr@1.11.4: + resolution: {integrity: sha512-uaff7RG9VIC4jacFW9xzL3jc0iM32DNHe4jYVycBcjUePT/Klnfj7pqtWJt9khvDFizmjN2TlYniYmSS2LIaZg==} + nanoid@5.1.5: resolution: {integrity: sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==} engines: {node: ^18 || >=20} @@ -807,10 +868,17 @@ packages: resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==} engines: {node: '>= 0.6'} + node-abort-controller@3.1.1: + resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} + node-forge@1.3.1: resolution: {integrity: sha512-dPEtOeMvF9VMcYV/1Wb8CPoVAXtp6MKMlcbAt4ddqmGqUJ6fQZFXkNZNkNlfevtNkGtaSoXf/vNNNSvgrdXwtA==} engines: {node: '>= 6.13.0'} + node-gyp-build-optional-packages@5.2.2: + resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} + hasBin: true + nodemon@3.1.10: resolution: {integrity: sha512-WDjw3pJ0/0jMFmyNDp3gvY2YizjLmmOUQo6DEBY+JgdvW/yQ9mEeSw6H5ythl5Ny2ytb7f9C2nIbjSxMNzbJXw==} engines: {node: '>=10'} @@ -1248,6 +1316,10 @@ packages: resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==} hasBin: true + uuid@9.0.1: + resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} + hasBin: true + validator@13.15.15: resolution: {integrity: sha512-BgWVbCI72aIQy937xbawcs+hrVaN/CZ2UwutgaJ36hGqRrLNM+f5LUT/YPRbo8IV/ASeFzXszezV+y2+rq3l8A==} engines: {node: '>= 0.10'} @@ -1394,6 +1466,8 @@ snapshots: '@kevisual/storage@0.0.4': {} + '@kevisual/task@0.0.1': {} + '@kevisual/types@0.0.10': {} '@kevisual/use-config@1.0.18(dotenv@16.5.0)': @@ -1410,6 +1484,24 @@ snapshots: dependencies: call-bind: 1.0.8 + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + optional: true + '@noble/hashes@1.8.0': {} '@paralleldrive/cuid2@2.2.2': @@ -1537,6 +1629,18 @@ snapshots: buffer-crc32@1.0.0: {} + bullmq@5.53.2: + dependencies: + cron-parser: 4.9.0 + ioredis: 5.6.1 + msgpackr: 1.11.4 + node-abort-controller: 3.1.1 + semver: 7.7.2 + tslib: 2.8.1 + uuid: 9.0.1 + transitivePeerDependencies: + - supports-color + bun-types@1.2.15: dependencies: '@types/node': 22.15.30 @@ -1612,6 +1716,10 @@ snapshots: object-assign: 4.1.1 vary: 1.1.2 + cron-parser@4.9.0: + dependencies: + luxon: 3.6.1 + cross-env@7.0.3: dependencies: cross-spawn: 7.0.6 @@ -1691,6 +1799,9 @@ snapshots: denque@2.1.0: {} + detect-libc@2.0.4: + optional: true + dezalgo@1.0.4: dependencies: asap: 2.0.6 @@ -2129,6 +2240,8 @@ snapshots: lru-cache@11.1.0: {} + luxon@3.6.1: {} + math-intrinsics@1.1.0: {} mime-db@1.52.0: {} @@ -2184,12 +2297,35 @@ snapshots: ms@2.1.3: {} + msgpackr-extract@3.0.3: + dependencies: + node-gyp-build-optional-packages: 5.2.2 + optionalDependencies: + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3 + optional: true + + msgpackr@1.11.4: + optionalDependencies: + msgpackr-extract: 3.0.3 + nanoid@5.1.5: {} negotiator@0.6.3: {} + node-abort-controller@3.1.1: {} + node-forge@1.3.1: {} + node-gyp-build-optional-packages@5.2.2: + dependencies: + detect-libc: 2.0.4 + optional: true + nodemon@3.1.10: dependencies: chokidar: 3.6.0 @@ -2700,6 +2836,8 @@ snapshots: uuid@8.3.2: {} + uuid@9.0.1: {} + validator@13.15.15: {} vary@1.1.2: {} diff --git a/src/dev.ts b/src/dev.ts index 911aaca..830a354 100644 --- a/src/dev.ts +++ b/src/dev.ts @@ -1,8 +1,10 @@ import { app, runCheck } from './index.ts'; +import { runCorn } from './tasks/index.ts'; -app.listen('./file-listen.sock', () => { +app.listen(30000, () => { console.log('Server is running on http://localhost:30000'); setTimeout(() => { runCheck(); + runCorn(); }, 1000); -}); \ No newline at end of file +}); diff --git a/src/minio-listen/minio.ts b/src/minio-listen/minio.ts index 1176915..ce5b44d 100644 --- a/src/minio-listen/minio.ts +++ b/src/minio-listen/minio.ts @@ -53,7 +53,7 @@ export const listen = async () => { if (data.type) { app.call({ path: 'file-listener', - key: 'sync-file', + key: 'sync-file-task', payload: { data: data, }, diff --git a/src/modules/redis.ts b/src/modules/redis.ts new file mode 100644 index 0000000..e035af8 --- /dev/null +++ b/src/modules/redis.ts @@ -0,0 +1,36 @@ +import { Redis } from 'ioredis'; +import { config } from './config.ts'; + +const redisConfig = { + host: config.REDIS_HOST || 'localhost', + port: parseInt(config.REDIS_PORT || '6379'), + password: config.REDIS_PASSWORD, +}; +export const createRedisClient = (options = {}) => { + const redisClient = new Redis({ + host: 'localhost', // Redis 服务器的主机名或 IP 地址 + port: 6379, // Redis 服务器的端口号 + // password: 'your_password', // Redis 的密码 (如果有) + db: 0, // 要使用的 Redis 数据库索引 (0-15) + keyPrefix: '', // key 前缀 + retryStrategy(times) { + // 连接重试策略 + return Math.min(times * 50, 2000); // 每次重试时延迟增加 + }, + maxRetriesPerRequest: null, // 允许请求重试的次数 (如果需要无限次重试) + ...redisConfig, + ...options, + }); + return redisClient; +}; +// 配置 Redis 连接 +export const redis = createRedisClient(); + +// 监听连接事件 +redis.on('connect', () => { + console.log('Redis 连接成功'); +}); + +redis.on('error', (err) => { + console.error('Redis 连接错误', err); +}); diff --git a/src/routes/sync.ts b/src/routes/sync.ts index 831903e..17916a6 100644 --- a/src/routes/sync.ts +++ b/src/routes/sync.ts @@ -3,6 +3,7 @@ import { FileSyncModel } from '@/file-sync/model.ts'; import { logger } from '@/modules/logger.ts'; import { oss, SyncFile, SyncType } from '@/modules/minio.ts'; import { BucketItemStat } from 'minio'; +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); type Result = { code: number; message?: string; @@ -10,10 +11,67 @@ type Result = { duration?: number; error?: string; }; + +app + .route({ + path: 'file-listener', + key: 'db-check', + description: '检查数据库中的文件, 获取minio的文件,如果不一致则更新', + }) + .define(async (ctx) => { + const where = { + $or: [ + { + checkedAt: { + $lt: new Date(Date.now() - 1000 * 60 * 60 * 24 * 30), // 检查30天前未检查的文件 + }, + }, + { + checkedAt: null, // 也检查从未检查过的文件 + }, + ], + }; + const count = await FileSyncModel.count({ where }); + + for (let i = 0; i < count; i += 10000) { + // 每次处理10000个文件,避免过载 + // 分页查询所有的数据库,获取对应的name + const dbFiles = await FileSyncModel.findAll({ + where, + attributes: ['name'], + raw: true, + limit: 10000, + offset: i, + }); + if (!dbFiles || dbFiles.length === 0) { + logger.info('No files to check in the database'); + continue; + } + for (const file of dbFiles) { + const name = file.name; + if (!name) continue; + // 调用同步文件的任务 + app.call({ + path: 'file-listener', + key: 'sync-file-task', + payload: { + data: { + type: 'sync', + name: name, + }, + }, + }); + } + sleep(100); + } + }) + .addTo(app); + app .route({ path: 'file-listener', key: 'sync-all', + description: '同步minio的所有文件到数据库', }) .define(async (ctx) => { const startTime = Date.now(); @@ -24,7 +82,7 @@ app if (data.name?.startsWith?.('private')) return; app.call({ path: 'file-listener', - key: 'sync-file', + key: 'sync-file-task', payload: { data: data, }, @@ -50,6 +108,25 @@ app }) .addTo(app); +app + .route({ + path: 'file-listener', + key: 'sync-file-task', + }) + .define(async (ctx) => { + const query = ctx.query; + console.log('queue', query); + const result = await app.call({ + path: 'file-listener', + key: 'sync-file', + payload: query, + }); + if (result.code !== 200) { + ctx.throw(result.code, result.message || 'Sync file task failed'); + } + ctx.body = result.data; + }) + .addTo(app); app .route({ path: 'file-listener', diff --git a/src/tasks/index.ts b/src/tasks/index.ts new file mode 100644 index 0000000..8f678f6 --- /dev/null +++ b/src/tasks/index.ts @@ -0,0 +1,48 @@ +import { Task } from '@kevisual/task'; +import { app } from '@/app.ts'; +import { Queue } from 'bullmq'; +import { redis } from '@/modules/redis.ts'; +import { logger } from '@/modules/logger.ts'; +export const task = new Task({ + name: 'file-listener-tasks', + app, + redis, +}); + +export const cornTask = new Task({ + name: 'file-listener-tasks-corn', + app, + redis, +}); +const corn = '0 3 1 * *'; // 设置每个月1号3点执行一次的定时任务的cron +export const runCorn = async () => { + const queue: Queue = cornTask.getQueue(); + + try { + const key = 'file-listener-sync-all'; + const existId = '8a815571879a730532471d952958fc71'; // 确保唯一性 + const pattern = corn; + // const existingJobs = await queue.getJobScheduler(existId); + const job = await queue.add( + 'file-listener-tasks-corn', + { + path: 'file-listener', + key: 'db-check', + }, + { + jobId: key, + repeat: { + pattern: pattern, + key: '8a815571879a730532471d952958fc71', // 确保唯一性 + }, + }, + ); + logger.info('Created new repeatable job:', job.id, job.name, job.repeatJobKey); + logger.info(`Created new repeatable job with cron pattern: ${pattern}`); + } catch (error) { + logger.error('Error managing job schedulers:', error); + } + + const worker = cornTask.getWorker(); + return worker; +};