feat: 添加一个功能模块,实现获取minio监听

This commit is contained in:
熊潇 2025-06-07 11:11:19 +08:00
parent f6374764a7
commit 5dc4990e26
7 changed files with 308 additions and 5 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@kevisual/file-listener",
"version": "0.0.1",
"version": "0.0.2",
"description": "",
"main": "index.js",
"basename": "/root/file-listener",
@ -43,6 +43,7 @@
"@kevisual/logger": "^0.0.4",
"@kevisual/oss": "^0.0.12",
"@kevisual/storage": "^0.0.4",
"@kevisual/task": "^0.0.1",
"@kevisual/types": "^0.0.10",
"@kevisual/use-config": "^1.0.18",
"@types/bun": "^1.2.15",
@ -50,6 +51,7 @@
"@types/formidable": "^3.4.5",
"@types/lodash-es": "^4.17.12",
"@types/node": "^22.15.30",
"bullmq": "^5.53.2",
"commander": "^14.0.0",
"concurrently": "^9.1.2",
"cross-env": "^7.0.3",

138
pnpm-lock.yaml generated
View File

@ -39,6 +39,9 @@ importers:
'@kevisual/storage':
specifier: ^0.0.4
version: 0.0.4
'@kevisual/task':
specifier: ^0.0.1
version: 0.0.1
'@kevisual/types':
specifier: ^0.0.10
version: 0.0.10
@ -57,6 +60,9 @@ importers:
'@types/node':
specifier: ^22.15.30
version: 22.15.30
bullmq:
specifier: ^5.53.2
version: 5.53.2
commander:
specifier: ^14.0.0
version: 14.0.0
@ -130,6 +136,9 @@ packages:
'@kevisual/storage@0.0.4':
resolution: {integrity: sha512-Pbk89sKHo6xk/bMja5RWOLi3W+xYnrMTQ1fURlC1SlPjGg88/FwAP8fJ8KBlpV1AOskoCj/cCB1pQ9AcB3Vjjw==}
'@kevisual/task@0.0.1':
resolution: {integrity: sha512-LyCxxFVQx+JpO53srtkcpJn5nD/l8NE/DtJv+FQ/iVXozWbnlP+p0ueIOoisDvRbUYBe7uShF6C5Icdq6Tqz1w==}
'@kevisual/types@0.0.10':
resolution: {integrity: sha512-Q73uzzjk9UidumnmCvOpgzqDDvQxsblz22bIFuoiioUFJWwaparx8bpd8ArRyFojicYL1YJoFDzDZ9j9NN8grA==}
@ -146,6 +155,36 @@ packages:
resolution: {integrity: sha512-ajBvlKpWucBB17FuQYUShqpqy8GRgYEpJW0vWJbUu1CV9lWyrDCapy0lScU8T8Z6qn49sSwJB3+M+evYIdGg+A==}
engines: {node: '>= 0.4'}
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
cpu: [arm64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==}
cpu: [x64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==}
cpu: [arm64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==}
cpu: [arm]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==}
cpu: [x64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==}
cpu: [x64]
os: [win32]
'@noble/hashes@1.8.0':
resolution: {integrity: sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==}
engines: {node: ^14.21.3 || >=16}
@ -273,6 +312,9 @@ packages:
resolution: {integrity: sha512-Db1SbgBS/fg/392AblrMJk97KggmvYhr4pB5ZIMTWtaivCPMWLkmb7m21cJvpvgK+J3nsU2CmmixNBZx4vFj/w==}
engines: {node: '>=8.0.0'}
bullmq@5.53.2:
resolution: {integrity: sha512-xHgxrP/yNJHD7VCw1h+eRBh+2TCPBCM39uC9gCyksYc6ufcJP+HTZ/A2lzB2x7qMFWrvsX7tM40AT2BmdkYL/Q==}
bun-types@1.2.15:
resolution: {integrity: sha512-NarRIaS+iOaQU1JPfyKhZm4AsUOrwUOqRNHY0XxI8GI8jYxiLXLcdjYMG9UKS+fwWasc1uw1htV9AX24dD+p4w==}
@ -335,6 +377,10 @@ packages:
resolution: {integrity: sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==}
engines: {node: '>= 0.10'}
cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
cross-env@7.0.3:
resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==}
engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'}
@ -400,6 +446,10 @@ packages:
resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==}
engines: {node: '>=0.10'}
detect-libc@2.0.4:
resolution: {integrity: sha512-3UDv+G9CsCKO1WKMGw9fwq/SWJYbI0c5Y7LU1AXYoDdbhE2AHQ6N6Nb34sG8Fj7T5APy8qXDCKuuIHd1BR0tVA==}
engines: {node: '>=8'}
dezalgo@1.0.4:
resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==}
@ -755,6 +805,10 @@ packages:
resolution: {integrity: sha512-QIXZUBJUx+2zHUdQujWejBkcD9+cs94tLn0+YL8UrCh+D5sCXZ4c7LaEH48pNwRY3MLDgqUFyhlCyjJPf1WP0A==}
engines: {node: 20 || >=22}
luxon@3.6.1:
resolution: {integrity: sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==}
engines: {node: '>=12'}
math-intrinsics@1.1.0:
resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==}
engines: {node: '>= 0.4'}
@ -798,6 +852,13 @@ packages:
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
msgpackr-extract@3.0.3:
resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==}
hasBin: true
msgpackr@1.11.4:
resolution: {integrity: sha512-uaff7RG9VIC4jacFW9xzL3jc0iM32DNHe4jYVycBcjUePT/Klnfj7pqtWJt9khvDFizmjN2TlYniYmSS2LIaZg==}
nanoid@5.1.5:
resolution: {integrity: sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==}
engines: {node: ^18 || >=20}
@ -807,10 +868,17 @@ packages:
resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==}
engines: {node: '>= 0.6'}
node-abort-controller@3.1.1:
resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
node-forge@1.3.1:
resolution: {integrity: sha512-dPEtOeMvF9VMcYV/1Wb8CPoVAXtp6MKMlcbAt4ddqmGqUJ6fQZFXkNZNkNlfevtNkGtaSoXf/vNNNSvgrdXwtA==}
engines: {node: '>= 6.13.0'}
node-gyp-build-optional-packages@5.2.2:
resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
hasBin: true
nodemon@3.1.10:
resolution: {integrity: sha512-WDjw3pJ0/0jMFmyNDp3gvY2YizjLmmOUQo6DEBY+JgdvW/yQ9mEeSw6H5ythl5Ny2ytb7f9C2nIbjSxMNzbJXw==}
engines: {node: '>=10'}
@ -1248,6 +1316,10 @@ packages:
resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==}
hasBin: true
uuid@9.0.1:
resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==}
hasBin: true
validator@13.15.15:
resolution: {integrity: sha512-BgWVbCI72aIQy937xbawcs+hrVaN/CZ2UwutgaJ36hGqRrLNM+f5LUT/YPRbo8IV/ASeFzXszezV+y2+rq3l8A==}
engines: {node: '>= 0.10'}
@ -1394,6 +1466,8 @@ snapshots:
'@kevisual/storage@0.0.4': {}
'@kevisual/task@0.0.1': {}
'@kevisual/types@0.0.10': {}
'@kevisual/use-config@1.0.18(dotenv@16.5.0)':
@ -1410,6 +1484,24 @@ snapshots:
dependencies:
call-bind: 1.0.8
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
optional: true
'@noble/hashes@1.8.0': {}
'@paralleldrive/cuid2@2.2.2':
@ -1537,6 +1629,18 @@ snapshots:
buffer-crc32@1.0.0: {}
bullmq@5.53.2:
dependencies:
cron-parser: 4.9.0
ioredis: 5.6.1
msgpackr: 1.11.4
node-abort-controller: 3.1.1
semver: 7.7.2
tslib: 2.8.1
uuid: 9.0.1
transitivePeerDependencies:
- supports-color
bun-types@1.2.15:
dependencies:
'@types/node': 22.15.30
@ -1612,6 +1716,10 @@ snapshots:
object-assign: 4.1.1
vary: 1.1.2
cron-parser@4.9.0:
dependencies:
luxon: 3.6.1
cross-env@7.0.3:
dependencies:
cross-spawn: 7.0.6
@ -1691,6 +1799,9 @@ snapshots:
denque@2.1.0: {}
detect-libc@2.0.4:
optional: true
dezalgo@1.0.4:
dependencies:
asap: 2.0.6
@ -2129,6 +2240,8 @@ snapshots:
lru-cache@11.1.0: {}
luxon@3.6.1: {}
math-intrinsics@1.1.0: {}
mime-db@1.52.0: {}
@ -2184,12 +2297,35 @@ snapshots:
ms@2.1.3: {}
msgpackr-extract@3.0.3:
dependencies:
node-gyp-build-optional-packages: 5.2.2
optionalDependencies:
'@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3
optional: true
msgpackr@1.11.4:
optionalDependencies:
msgpackr-extract: 3.0.3
nanoid@5.1.5: {}
negotiator@0.6.3: {}
node-abort-controller@3.1.1: {}
node-forge@1.3.1: {}
node-gyp-build-optional-packages@5.2.2:
dependencies:
detect-libc: 2.0.4
optional: true
nodemon@3.1.10:
dependencies:
chokidar: 3.6.0
@ -2700,6 +2836,8 @@ snapshots:
uuid@8.3.2: {}
uuid@9.0.1: {}
validator@13.15.15: {}
vary@1.1.2: {}

View File

@ -1,8 +1,10 @@
import { app, runCheck } from './index.ts';
import { runCorn } from './tasks/index.ts';
app.listen('./file-listen.sock', () => {
app.listen(30000, () => {
console.log('Server is running on http://localhost:30000');
setTimeout(() => {
runCheck();
runCorn();
}, 1000);
});
});

View File

@ -53,7 +53,7 @@ export const listen = async () => {
if (data.type) {
app.call({
path: 'file-listener',
key: 'sync-file',
key: 'sync-file-task',
payload: {
data: data,
},

36
src/modules/redis.ts Normal file
View File

@ -0,0 +1,36 @@
import { Redis } from 'ioredis';
import { config } from './config.ts';
const redisConfig = {
host: config.REDIS_HOST || 'localhost',
port: parseInt(config.REDIS_PORT || '6379'),
password: config.REDIS_PASSWORD,
};
export const createRedisClient = (options = {}) => {
const redisClient = new Redis({
host: 'localhost', // Redis 服务器的主机名或 IP 地址
port: 6379, // Redis 服务器的端口号
// password: 'your_password', // Redis 的密码 (如果有)
db: 0, // 要使用的 Redis 数据库索引 (0-15)
keyPrefix: '', // key 前缀
retryStrategy(times) {
// 连接重试策略
return Math.min(times * 50, 2000); // 每次重试时延迟增加
},
maxRetriesPerRequest: null, // 允许请求重试的次数 (如果需要无限次重试)
...redisConfig,
...options,
});
return redisClient;
};
// 配置 Redis 连接
export const redis = createRedisClient();
// 监听连接事件
redis.on('connect', () => {
console.log('Redis 连接成功');
});
redis.on('error', (err) => {
console.error('Redis 连接错误', err);
});

View File

@ -3,6 +3,7 @@ import { FileSyncModel } from '@/file-sync/model.ts';
import { logger } from '@/modules/logger.ts';
import { oss, SyncFile, SyncType } from '@/modules/minio.ts';
import { BucketItemStat } from 'minio';
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
type Result = {
code: number;
message?: string;
@ -10,10 +11,67 @@ type Result = {
duration?: number;
error?: string;
};
app
.route({
path: 'file-listener',
key: 'db-check',
description: '检查数据库中的文件, 获取minio的文件如果不一致则更新',
})
.define(async (ctx) => {
const where = {
$or: [
{
checkedAt: {
$lt: new Date(Date.now() - 1000 * 60 * 60 * 24 * 30), // 检查30天前未检查的文件
},
},
{
checkedAt: null, // 也检查从未检查过的文件
},
],
};
const count = await FileSyncModel.count({ where });
for (let i = 0; i < count; i += 10000) {
// 每次处理10000个文件避免过载
// 分页查询所有的数据库获取对应的name
const dbFiles = await FileSyncModel.findAll({
where,
attributes: ['name'],
raw: true,
limit: 10000,
offset: i,
});
if (!dbFiles || dbFiles.length === 0) {
logger.info('No files to check in the database');
continue;
}
for (const file of dbFiles) {
const name = file.name;
if (!name) continue;
// 调用同步文件的任务
app.call({
path: 'file-listener',
key: 'sync-file-task',
payload: {
data: {
type: 'sync',
name: name,
},
},
});
}
sleep(100);
}
})
.addTo(app);
app
.route({
path: 'file-listener',
key: 'sync-all',
description: '同步minio的所有文件到数据库',
})
.define(async (ctx) => {
const startTime = Date.now();
@ -24,7 +82,7 @@ app
if (data.name?.startsWith?.('private')) return;
app.call({
path: 'file-listener',
key: 'sync-file',
key: 'sync-file-task',
payload: {
data: data,
},
@ -50,6 +108,25 @@ app
})
.addTo(app);
app
.route({
path: 'file-listener',
key: 'sync-file-task',
})
.define(async (ctx) => {
const query = ctx.query;
console.log('queue', query);
const result = await app.call({
path: 'file-listener',
key: 'sync-file',
payload: query,
});
if (result.code !== 200) {
ctx.throw(result.code, result.message || 'Sync file task failed');
}
ctx.body = result.data;
})
.addTo(app);
app
.route({
path: 'file-listener',

48
src/tasks/index.ts Normal file
View File

@ -0,0 +1,48 @@
import { Task } from '@kevisual/task';
import { app } from '@/app.ts';
import { Queue } from 'bullmq';
import { redis } from '@/modules/redis.ts';
import { logger } from '@/modules/logger.ts';
export const task = new Task({
name: 'file-listener-tasks',
app,
redis,
});
export const cornTask = new Task({
name: 'file-listener-tasks-corn',
app,
redis,
});
const corn = '0 3 1 * *'; // 设置每个月1号3点执行一次的定时任务的cron
export const runCorn = async () => {
const queue: Queue = cornTask.getQueue();
try {
const key = 'file-listener-sync-all';
const existId = '8a815571879a730532471d952958fc71'; // 确保唯一性
const pattern = corn;
// const existingJobs = await queue.getJobScheduler(existId);
const job = await queue.add(
'file-listener-tasks-corn',
{
path: 'file-listener',
key: 'db-check',
},
{
jobId: key,
repeat: {
pattern: pattern,
key: '8a815571879a730532471d952958fc71', // 确保唯一性
},
},
);
logger.info('Created new repeatable job:', job.id, job.name, job.repeatJobKey);
logger.info(`Created new repeatable job with cron pattern: ${pattern}`);
} catch (error) {
logger.error('Error managing job schedulers:', error);
}
const worker = cornTask.getWorker();
return worker;
};