feat: add listener

This commit is contained in:
熊潇 2025-06-07 00:57:31 +08:00
parent 55ddd8fca8
commit f6374764a7
21 changed files with 506 additions and 50 deletions

View File

@ -1,13 +0,0 @@
# .cnb.yml
$:
vscode:
- docker:
image: docker.cnb.cool/kevisual/dev-env:latest
services:
- vscode
- docker
imports: https://cnb.cool/kevisual/env/-/blob/main/env.yml
# 开发环境启动后会执行的任务
stages:
- name: pnpm install
script: pnpm install

1
bin/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
minio

View File

@ -2,8 +2,9 @@
import { resolvePath } from '@kevisual/use-config/env';
import { execSync } from 'node:child_process';
const entry = 'src/index.ts';
const entry = 'src/main.ts';
const naming = 'app';
const external = ['postgres','sequelize', '@kevisual/router','ph-hstore', 'minio'];
/**
* @type {import('bun').BuildConfig}
*/
@ -15,7 +16,7 @@ await Bun.build({
naming: {
entry: `${naming}.js`,
},
external: [],
external,
env: 'KEVISUAL_*',
});

View File

@ -1,19 +1,17 @@
{
"name": "demo-app",
"name": "@kevisual/file-listener",
"version": "0.0.1",
"description": "",
"main": "index.js",
"basename": "/root/demo-app",
"basename": "/root/file-listener",
"app": {
"key": "demo-app",
"key": "file-listener",
"entry": "dist/app.js",
"type": "system-app",
"files": [
"dist"
]
"type": "pm2-system-app"
},
"scripts": {
"dev": "cross-env NODE_TLS_REJECT_UNAUTHORIZED=0 bun --watch src/dev.ts ",
"minio": "MINIO_CONFIG_ENV_FILE=./.env.minio ./bin/minio server --console-address :9001",
"build": "rimraf dist && bun run bun.config.mjs",
"test": "tsx test/**/*.ts",
"clean": "rm -rf dist",
@ -27,7 +25,7 @@
"types": "types/index.d.ts",
"files": [
"dist",
"src"
"src/file-sync"
],
"publishConfig": {
"access": "public"
@ -35,15 +33,18 @@
"dependencies": {
"@kevisual/code-center-module": "0.0.20",
"@kevisual/router": "0.0.22",
"@kevisual/use-config": "^1.0.17",
"@kevisual/use-config": "^1.0.18",
"cookie": "^1.0.2",
"dayjs": "^1.11.13",
"formidable": "^3.5.4",
"lodash-es": "^4.17.21"
},
"devDependencies": {
"@kevisual/logger": "^0.0.4",
"@kevisual/oss": "^0.0.12",
"@kevisual/storage": "^0.0.4",
"@kevisual/types": "^0.0.10",
"@kevisual/use-config": "^1.0.17",
"@kevisual/use-config": "^1.0.18",
"@types/bun": "^1.2.15",
"@types/crypto-js": "^4.2.2",
"@types/formidable": "^3.4.5",
@ -57,10 +58,11 @@
"minio": "^8.0.5",
"nodemon": "^3.1.10",
"pg": "^8.16.0",
"pg-hstore": "^2.3.4",
"rimraf": "^6.0.1",
"sequelize": "^6.37.7",
"tape": "^5.9.0",
"typescript": "^5.8.3"
},
"packageManager": "pnpm@10.11.1"
}
}

63
pnpm-lock.yaml generated
View File

@ -10,13 +10,13 @@ importers:
dependencies:
'@kevisual/code-center-module':
specifier: 0.0.20
version: 0.0.20(dotenv@16.5.0)
version: 0.0.20(dotenv@16.5.0)(pg-hstore@2.3.4)
'@kevisual/router':
specifier: 0.0.22
version: 0.0.22
'@kevisual/use-config':
specifier: ^1.0.17
version: 1.0.17(dotenv@16.5.0)
specifier: ^1.0.18
version: 1.0.18(dotenv@16.5.0)
cookie:
specifier: ^1.0.2
version: 1.0.2
@ -30,6 +30,15 @@ importers:
specifier: ^4.17.21
version: 4.17.21
devDependencies:
'@kevisual/logger':
specifier: ^0.0.4
version: 0.0.4
'@kevisual/oss':
specifier: ^0.0.12
version: 0.0.12
'@kevisual/storage':
specifier: ^0.0.4
version: 0.0.4
'@kevisual/types':
specifier: ^0.0.10
version: 0.0.10
@ -72,12 +81,15 @@ importers:
pg:
specifier: ^8.16.0
version: 8.16.0
pg-hstore:
specifier: ^2.3.4
version: 2.3.4
rimraf:
specifier: ^6.0.1
version: 6.0.1
sequelize:
specifier: ^6.37.7
version: 6.37.7(pg@8.16.0)
version: 6.37.7(pg-hstore@2.3.4)(pg@8.16.0)
tape:
specifier: ^5.9.0
version: 5.9.0
@ -103,17 +115,26 @@ packages:
'@kevisual/load@0.0.6':
resolution: {integrity: sha512-+3YTFehRcZ1haGel5DKYMUwmi5i6f2psyaPZlfkKU/cOXgkpwoG9/BEqPCnPjicKqqnksEpixVRkyHJ+5bjLVA==}
'@kevisual/logger@0.0.4':
resolution: {integrity: sha512-+fpr92eokSxoGOW1SIRl/27lPuO+zyY+feR5o2Q4YCNlAdt2x64NwC/w8r/3NEC5QenLgd4K0azyKTI2mHbARw==}
'@kevisual/oss@0.0.12':
resolution: {integrity: sha512-tYr242IwwRFaxTXk3P+7Absuy+7BlNXhPwUtFco7D3q8h7cNVGOFPh8UVQEFI3i0tRMSKB2i8X/jx/y8tP5eVQ==}
'@kevisual/router@0.0.21':
resolution: {integrity: sha512-XKTxbNO924cT18UOAGplWErZ+hMze8Y53F2jYCk18v4jsdsvjRho5uXXjJb6HSVsuITMtQR4R3rG0IcM3jkDKQ==}
'@kevisual/router@0.0.22':
resolution: {integrity: sha512-Cqv2vV+hPBHrMMfvWlfDIuNrQcmd260oQZ4S5QR/R4tV35XtMKiseqhnC9uR09oVBJUh+d5rW3YucDDddheeDQ==}
'@kevisual/storage@0.0.4':
resolution: {integrity: sha512-Pbk89sKHo6xk/bMja5RWOLi3W+xYnrMTQ1fURlC1SlPjGg88/FwAP8fJ8KBlpV1AOskoCj/cCB1pQ9AcB3Vjjw==}
'@kevisual/types@0.0.10':
resolution: {integrity: sha512-Q73uzzjk9UidumnmCvOpgzqDDvQxsblz22bIFuoiioUFJWwaparx8bpd8ArRyFojicYL1YJoFDzDZ9j9NN8grA==}
'@kevisual/use-config@1.0.17':
resolution: {integrity: sha512-EsuMJ5bhAbdERvpD55td1diRxx4kSxtYVaIHo0vDvnLetuXLfq+j2DPGmWl/oRdO48op0dme5oo1DctCqpgYcQ==}
'@kevisual/use-config@1.0.18':
resolution: {integrity: sha512-v3m84iyNlXB3zdLw9/NmrZk8AEdeS6zKQkMsQmSIQntHW+v9AEpuCX7ipW3pl2yIxMxwo6sedi8NwchHADtblw==}
peerDependencies:
dotenv: ^16.4.7
@ -854,6 +875,10 @@ packages:
pg-connection-string@2.9.0:
resolution: {integrity: sha512-P2DEBKuvh5RClafLngkAuGe9OUlFV7ebu8w1kmaaOgPcpJd1RIFh7otETfI6hAR8YupOLFTY7nuvvIn7PLciUQ==}
pg-hstore@2.3.4:
resolution: {integrity: sha512-N3SGs/Rf+xA1M2/n0JBiXFDVMzdekwLZLAO0g7mpDY9ouX+fDI7jS6kTq3JujmYbtNSJ53TJ0q4G98KVZSM4EA==}
engines: {node: '>= 0.8.x'}
pg-int8@1.0.1:
resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==}
engines: {node: '>=4.0.0'}
@ -1207,6 +1232,9 @@ packages:
undefsafe@2.0.5:
resolution: {integrity: sha512-WxONCrssBM8TSPRqN5EmsjVrsv4A8X12J4ArBiiayv3DyyG3ZlIg6yysuuSYdZsVz3TKcTg2fd//Ujd4CHV1iA==}
underscore@1.13.7:
resolution: {integrity: sha512-GMXzWtsc57XAtguZgaQViUOzs0KTkk8ojr3/xAxXLITqf/3EMwxC0inyETfDFjH/Krbhuep0HNbbjI9i/q3F3g==}
undici-types@6.21.0:
resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==}
@ -1320,15 +1348,15 @@ snapshots:
'@kevisual/auth@1.0.5': {}
'@kevisual/code-center-module@0.0.20(dotenv@16.5.0)':
'@kevisual/code-center-module@0.0.20(dotenv@16.5.0)(pg-hstore@2.3.4)':
dependencies:
'@kevisual/auth': 1.0.5
'@kevisual/router': 0.0.21
'@kevisual/use-config': 1.0.17(dotenv@16.5.0)
'@kevisual/use-config': 1.0.18(dotenv@16.5.0)
ioredis: 5.6.1
nanoid: 5.1.5
pg: 8.16.0
sequelize: 6.37.7(pg@8.16.0)
sequelize: 6.37.7(pg-hstore@2.3.4)(pg@8.16.0)
socket.io: 4.8.1
zod: 3.25.55
transitivePeerDependencies:
@ -1350,6 +1378,10 @@ snapshots:
dependencies:
eventemitter3: 5.0.1
'@kevisual/logger@0.0.4': {}
'@kevisual/oss@0.0.12': {}
'@kevisual/router@0.0.21':
dependencies:
path-to-regexp: 8.2.0
@ -1360,9 +1392,11 @@ snapshots:
path-to-regexp: 8.2.0
selfsigned: 2.4.1
'@kevisual/storage@0.0.4': {}
'@kevisual/types@0.0.10': {}
'@kevisual/use-config@1.0.17(dotenv@16.5.0)':
'@kevisual/use-config@1.0.18(dotenv@16.5.0)':
dependencies:
'@kevisual/load': 0.0.6
dotenv: 16.5.0
@ -2221,6 +2255,10 @@ snapshots:
pg-connection-string@2.9.0: {}
pg-hstore@2.3.4:
dependencies:
underscore: 1.13.7
pg-int8@1.0.1: {}
pg-pool@3.10.0(pg@8.16.0):
@ -2361,7 +2399,7 @@ snapshots:
sequelize-pool@7.1.0: {}
sequelize@6.37.7(pg@8.16.0):
sequelize@6.37.7(pg-hstore@2.3.4)(pg@8.16.0):
dependencies:
'@types/debug': 4.1.12
'@types/validator': 13.15.1
@ -2381,6 +2419,7 @@ snapshots:
wkx: 0.5.0
optionalDependencies:
pg: 8.16.0
pg-hstore: 2.3.4
transitivePeerDependencies:
- supports-color
@ -2645,6 +2684,8 @@ snapshots:
undefsafe@2.0.5: {}
underscore@1.13.7: {}
undici-types@6.21.0: {}
util-deprecate@1.0.2: {}

5
src/app.ts Normal file
View File

@ -0,0 +1,5 @@
import { App } from '@kevisual/router';
import { sequelize as dblib } from './modules/sequelize.ts';
import { useContextKey } from '@kevisual/use-config/context';
export const app = new App();
export const sequelize = useContextKey('sequelize', () => dblib);

8
src/dev.ts Normal file
View File

@ -0,0 +1,8 @@
import { app, runCheck } from './index.ts';
app.listen('./file-listen.sock', () => {
console.log('Server is running on http://localhost:30000');
setTimeout(() => {
runCheck();
}, 1000);
});

103
src/file-sync/model.ts Normal file
View File

@ -0,0 +1,103 @@
import { useContextKey } from '@kevisual/use-config/context';
import { BucketItemStat } from 'minio';
import { Model, DataTypes } from 'sequelize';
export type FileSyncModelType = {};
export class FileSyncModel extends Model {
declare id: string;
declare name: string;
declare hash: string;
declare stat: BucketItemStat;
declare data: any;
declare createdAt: Date;
declare updatedAt: Date;
declare checkedAt: Date;
get user() {
const name = this.name || '';
const _u = name.split('/')[0];
return _u || '';
}
async updateStat(stat: BucketItemStat) {
const currentStat = this.stat || {};
const equal = JSON.stringify(currentStat) === JSON.stringify(stat);
if (!equal) {
await this.update({
hash: stat.etag || '',
stat,
checkedAt: new Date(),
});
return {
type: 'updateStat',
};
}
const checkedAt = this.checkedAt;
// 对比当前时间如果超过了15天更新为当前时间
const fifteenDays = 15 * 24 * 60 * 60 * 1000; // 15 days in milliseconds
if (Date.now() - new Date(checkedAt).getTime() > fifteenDays) {
await this.update({ checkedAt: new Date() });
return {
type: 'updateCheckedAt',
};
}
return {
type: 'noChange',
};
}
}
export const initFileSyncModel = async () => {
const sequelize = await useContextKey('sequelize');
FileSyncModel.init(
{
id: {
type: DataTypes.UUID,
primaryKey: true,
defaultValue: DataTypes.UUIDV4,
comment: 'id',
},
name: {
type: DataTypes.STRING,
allowNull: true,
},
hash: {
type: DataTypes.STRING,
allowNull: true,
},
stat: {
type: DataTypes.JSONB,
allowNull: true,
defaultValue: {},
},
data: {
type: DataTypes.JSONB,
allowNull: true,
defaultValue: {},
},
checkedAt: {
type: DataTypes.DATE,
allowNull: true,
defaultValue: DataTypes.NOW,
comment: 'Last checked time',
},
},
{
sequelize,
tableName: 'file_sync',
indexes: [
{
name: 'file_sync_name_idx',
fields: ['name'],
},
],
},
);
await FileSyncModel.sync({ alter: true, logging: false }).catch((err) => {
console.error('Error syncing FileSyncModel:', err);
});
return FileSyncModel;
};
initFileSyncModel();

22
src/index.ts Normal file
View File

@ -0,0 +1,22 @@
import { listen } from './minio-listen/minio.ts';
import { app } from './app.ts';
import './routes/index.ts';
import { storage } from './modules/storage.ts';
export { app };
listen();
export const runCheck = async () => {
const isFirst = await storage.checkIsFirst();
if (isFirst) {
console.log('This is the first run, initializing sync...');
await app.call(
{
path: 'file-listener',
key: 'sync-all',
payload: {},
},
{ timeout: 1000 * 60 * 60 }, // 1 hour timeout
);
}
};

10
src/main.ts Normal file
View File

@ -0,0 +1,10 @@
import { app, runCheck } from './index.ts';
setTimeout(() => {
runCheck();
keepServer();
console.log('app server length', app.router.routes.length);
}, 1000);
const keepServer = () => {
setInterval(() => {}, 2000);
};

View File

@ -1,23 +1,65 @@
import { Client } from 'minio';
const minioClient = new Client({
endPoint: 'localhost',
port: 9000,
useSSL: false,
accessKey: 'admin',
secretKey: 'admin123',
});
import { minioClient, bucketName } from '@/modules/minio.ts';
import utils from 'node:util';
import { app } from '@/app.ts';
// import { NotificationRecord } from 'minio';
type NotificationRecord = {
eventName: string;
s3: {
bucket: {
name: string;
arn: string;
};
object: {
key: string;
size: number;
eTag: string;
contentType: string;
userMetadata: Record<string, string>;
sequencer?: string;
};
};
};
// 监听事件
const listen = async () => {
const res = minioClient.listenBucketNotification('mark', 'common', '.md', [
export const listen = async () => {
const res = minioClient.listenBucketNotification(bucketName, '', '', [
's3:ObjectCreated:*', // 监听所有对象创建事件
// delete
's3:ObjectRemoved:*',
]);
res.on('notification', (event) => {
console.log(event);
res.on('notification', (event: NotificationRecord) => {
// console.log(event);
const eventName = event.eventName;
const ev = eventName.split(':').slice(0, 2).join(':');
let data = {
key: event.s3.object.key,
type: '',
};
if (data.key.startsWith('private/')) {
return;
}
switch (ev) {
case 's3:ObjectCreated':
data.type = 'create';
break;
case 's3:ObjectRemoved':
data.type = 'delete';
break;
default:
console.log('Unknown event:', eventName);
break;
}
// console.log('obj', utils.inspect(event.s3.object, false, null, true));
if (data.type) {
app.call({
path: 'file-listener',
key: 'sync-file',
payload: {
data: data,
},
});
}
});
// const res = await minioClient.getBucketNotification('mark');
// console.log(res);
};
listen();

3
src/modules/config.ts Normal file
View File

@ -0,0 +1,3 @@
import { useConfig } from '@kevisual/use-config/env';
export const config = useConfig();

8
src/modules/logger.ts Normal file
View File

@ -0,0 +1,8 @@
import { useConfig } from '@kevisual/use-config/env';
import { Logger } from '@kevisual/logger';
const config = useConfig();
export const logger = new Logger({
level: config.LOG_LEVEL || 'info',
showTime: true,
});

28
src/modules/minio.ts Normal file
View File

@ -0,0 +1,28 @@
import { Client } from 'minio';
import { config } from './config.ts';
import { OssBase } from '@kevisual/oss';
export const bucketName = config.MINIO_BUCKET_NAME || 'resources';
export const minioClient = new Client({
endPoint: config.MINIO_ENDPOINT || 'localhost',
port: parseInt(config.MINIO_PORT || '9000', 10),
useSSL: config.MINIO_USE_SSL === 'true',
accessKey: config.MINIO_ACCESS_KEY || 'minioadmin',
secretKey: config.MINIO_SECRET_KEY || 'minioadmin',
});
export const oss = new OssBase({
client: minioClient,
bucketName: config.MINIO_BUCKET_NAME || 'resources',
});
export type SyncFile = {
name: string;
etag: string;
size: number;
lastModified: Date;
userMetadata?: Record<string, string>;
};
export const syncTypes = ['sync', 'delete', 'create', 'update'] as const;
export type SyncType = (typeof syncTypes)[number];

41
src/modules/sequelize.ts Normal file
View File

@ -0,0 +1,41 @@
import { Sequelize } from 'sequelize';
import { config } from './config.ts';
import { logger } from './logger.ts';
export type PostgresConfig = {
postgres: {
username: string;
password: string;
host: string;
port: number;
database: string;
};
};
if (!config.POSTGRES_PASSWORD || !config.POSTGRES_USER) {
logger.error('postgres config is required password and user');
logger.error('config', config);
process.exit(1);
}
const postgresConfig = {
username: config.POSTGRES_USER,
password: config.POSTGRES_PASSWORD,
host: config.POSTGRES_HOST || 'localhost',
port: parseInt(config.POSTGRES_PORT || '5432'),
database: config.POSTGRES_DB || 'postgres',
};
// connect to db
export const sequelize = new Sequelize({
dialect: 'postgres',
...postgresConfig,
// logging: false,
});
sequelize
.authenticate({ logging: false })
.then(() => {
logger.info('Database connected');
})
.catch((err) => {
logger.error('Database connection failed', { err, config: postgresConfig });
process.exit(1);
});

22
src/modules/storage.ts Normal file
View File

@ -0,0 +1,22 @@
import { AssistantStorage } from '@kevisual/storage';
export class FileListenerStorage extends AssistantStorage {
constructor() {
super({
app: 'file-listener',
});
}
/**
* true minio获取所有的列表minio当中去
* @returns
*/
async checkIsFirst() {
const data = this.getData();
if (data['mounted'] !== true) {
this.setData(null, { mounted: true });
return true;
}
return false;
}
}
export const storage = new FileListenerStorage();

1
src/routes/index.ts Normal file
View File

@ -0,0 +1 @@
import './sync.ts';

101
src/routes/sync.ts Normal file
View File

@ -0,0 +1,101 @@
import { app } from '@/app.ts';
import { FileSyncModel } from '@/file-sync/model.ts';
import { logger } from '@/modules/logger.ts';
import { oss, SyncFile, SyncType } from '@/modules/minio.ts';
import { BucketItemStat } from 'minio';
type Result = {
code: number;
message?: string;
data?: any;
duration?: number;
error?: string;
};
app
.route({
path: 'file-listener',
key: 'sync-all',
})
.define(async (ctx) => {
const startTime = Date.now();
const runSync = async () => {
return new Promise<Result>((resolve, reject) => {
const dataStream = oss.client.listObjectsV2(oss.bucketName, '', true);
dataStream.on('data', async (data) => {
if (data.name?.startsWith?.('private')) return;
app.call({
path: 'file-listener',
key: 'sync-file',
payload: {
data: data,
},
});
});
dataStream.on('end', () => {
const endTime = Date.now();
logger.info(`Sync completed in ${endTime - startTime} ms`);
resolve({ code: 200, message: 'Sync completed successfully', duration: endTime - startTime });
});
dataStream.on('error', (err) => {
console.error('Error during sync:', err);
ctx.body = { message: 'Sync failed', error: err.message };
resolve({ code: 500, message: 'Sync failed', error: err.message });
});
});
};
const res = await runSync();
if (res.code === 200) {
ctx.body = res.data;
}
ctx.throw(res.code, res.message || 'Sync operation completed');
})
.addTo(app);
app
.route({
path: 'file-listener',
key: 'sync-file',
})
.define(async (ctx) => {
const { data } = ctx.query;
if (!data.name && !data.key) {
ctx.throw(400, 'No data provided for sync');
}
const type: SyncType = data.type || 'sync';
const name = data.name || data.key;
logger.info('Syncing file:', name, 'Type:', type);
let fileSync: FileSyncModel = null;
if (type === 'delete') {
const deleteNum = await FileSyncModel.destroy({
where: { name },
});
if (deleteNum) {
logger.info('File deleted from sync model:', name);
}
return;
}
const bucketItemStat: BucketItemStat = await oss.statObject(name);
if (type === 'create' || type === 'update' || type === 'sync') {
const [fileSync, created] = await FileSyncModel.findOrCreate({
where: { name },
defaults: {
name,
hash: bucketItemStat.etag || '',
stat: bucketItemStat || {},
data: {},
checkedAt: new Date(),
},
});
if (created) {
logger.info('File created in sync model:', name);
return;
}
const result = await fileSync.updateStat(bucketItemStat);
logger.info('File updated in sync model:', name, result);
ctx.body = result;
return;
}
logger.info('Processing file:', bucketItemStat);
})
.addTo(app);

View File

@ -0,0 +1,10 @@
import { oss } from '@/modules/minio.ts';
export class SyncAllFileServices {
emitter: any;
async sync() {
let startAfter = '';
const fileList = await oss.listObjects('/', { recursive: true });
// 处理文件列表
}
}

8
src/test/common.ts Normal file
View File

@ -0,0 +1,8 @@
import { minioClient } from '../modules/minio.ts';
import { OssBase } from '@kevisual/oss';
const oss = new OssBase({
client: minioClient,
bucketName: 'resources',
});
export { minioClient, oss };

12
src/test/get-list.ts Normal file
View File

@ -0,0 +1,12 @@
import { oss } from './common.ts';
const list = async () => {
try {
const result = await oss.listObjects('');
console.log('Objects:', result);
} catch (error) {
console.error('Error listing objects:', error);
}
};
list();