Refactor storage integration from MinIO to S3
- Removed MinIO client and related imports from various modules. - Introduced S3 client and OSS integration for object storage. - Updated all references to MinIO methods with corresponding S3 methods. - Added new flowme table schema to the database. - Adjusted upload and download routes to utilize S3 for file operations. - Removed obsolete MinIO-related files and routes. - Ensured compatibility with existing application logic while transitioning to S3.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import { useConfig } from '@kevisual/use-config';
|
||||
import { useFileStore } from '@kevisual/use-config';
|
||||
import { minioResources } from './minio.ts';
|
||||
import { minioResources } from './s3.ts';
|
||||
|
||||
export const config = useConfig() as any;
|
||||
export const port = config.PORT ? Number(config.PORT) : 4005;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { bucketName, minioClient } from '@/modules/minio.ts';
|
||||
import { oss } from '@/app.ts';
|
||||
import { IncomingMessage, ServerResponse } from 'http';
|
||||
import { filterKeys } from './http-proxy.ts';
|
||||
import { getUserFromRequest } from '../utils.ts';
|
||||
@@ -11,6 +11,7 @@ import { parseSearchValue } from '@kevisual/router/src/server/parse-body.ts';
|
||||
import { logger } from '@/modules/logger.ts';
|
||||
import { pipeBusboy } from '../pipe-busboy.ts';
|
||||
import { pipeMinioStream } from '../pipe.ts';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
type FileList = {
|
||||
name: string;
|
||||
@@ -112,7 +113,8 @@ const getAiProxy = async (req: IncomingMessage, res: ServerResponse, opts: Proxy
|
||||
const etag = stat.etag;
|
||||
const lastModified = stat.lastModified.toISOString();
|
||||
|
||||
const objectStream = await minioClient.getObject(bucketName, objectName);
|
||||
const objectStream = await oss.getObject(objectName);
|
||||
// const objectStream = await minioClient.getObject(bucketName, objectName);
|
||||
const headers = {
|
||||
'Content-Length': contentLength,
|
||||
etag,
|
||||
@@ -124,8 +126,7 @@ const getAiProxy = async (req: IncomingMessage, res: ServerResponse, opts: Proxy
|
||||
...headers,
|
||||
});
|
||||
// objectStream.pipe(res, { end: true });
|
||||
// @ts-ignore
|
||||
pipeMinioStream(objectStream, res);
|
||||
pipeMinioStream(objectStream.Body as Readable, res);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(`Proxy request error: ${error.message}`);
|
||||
@@ -164,9 +165,6 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts:
|
||||
let fileSize: number | undefined = undefined;
|
||||
if (_fileSize) {
|
||||
fileSize = parseInt(_fileSize, 10);
|
||||
} else if (req.headers['content-length']) {
|
||||
// 如果 URL 参数没有 size,尝试从请求头获取
|
||||
fileSize = parseInt(req.headers['content-length'], 10);
|
||||
}
|
||||
let meta = parseSearchValue(params.get('meta'), { decode: true });
|
||||
if (!hash && !force) {
|
||||
@@ -214,7 +212,7 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts:
|
||||
...getMetadata(pathname),
|
||||
...meta,
|
||||
},
|
||||
{ check: false, isStream: true, size: fileSize },
|
||||
{ check: false, isStream: true },
|
||||
);
|
||||
end({ success: true, name, info, isNew: true, hash, meta: meta?.metaData, statMeta }, '上传成功', 200);
|
||||
|
||||
@@ -301,8 +299,9 @@ export const deleteProxy = async (req: IncomingMessage, res: ServerResponse, opt
|
||||
if (objectName.endsWith('/')) {
|
||||
const objects = await oss.listObjects<true>(objectName, { recursive: true });
|
||||
if (objects.length > 0) {
|
||||
const objectNames = objects.map((obj: any) => obj.name);
|
||||
await minioClient.removeObjects(bucketName, objectNames);
|
||||
for (const obj of objects) {
|
||||
await oss.deleteObject(obj.name);
|
||||
}
|
||||
}
|
||||
end({ success: true, objectName, deletedCount: objects.length }, 'delete success', 200);
|
||||
} else {
|
||||
@@ -357,17 +356,16 @@ export const renameProxy = async (req: IncomingMessage, res: ServerResponse, opt
|
||||
const oldKey = obj.name;
|
||||
const newKey = oldKey.replace(objectName, newObjectName);
|
||||
console.log('rename dir object', oldKey, newKey);
|
||||
await minioClient.copyObject(bucketName, newKey, `/${bucketName}/${oldKey}`);
|
||||
await oss.copyObject(oldKey, newKey);
|
||||
copiedCount++;
|
||||
}
|
||||
// 删除原对象
|
||||
const objectNames = objects.map((obj: any) => obj.name);
|
||||
if (objectNames.length > 0) {
|
||||
await minioClient.removeObjects(bucketName, objectNames);
|
||||
for (const obj of objects) {
|
||||
console.log('deleted object', obj.name);
|
||||
await oss.deleteObject(obj.name);
|
||||
}
|
||||
} else {
|
||||
// 重命名文件
|
||||
await minioClient.copyObject(bucketName, newObjectName, `/${bucketName}/${objectName}`);
|
||||
await oss.copyObject(objectName, newObjectName);
|
||||
await oss.deleteObject(objectName);
|
||||
copiedCount = 1;
|
||||
}
|
||||
@@ -384,7 +382,6 @@ type ProxyOptions = {
|
||||
oss?: OssBase;
|
||||
};
|
||||
export const aiProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
|
||||
const oss = new OssBase({ bucketName, client: minioClient });
|
||||
if (!opts.oss) {
|
||||
opts.oss = oss;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { pipeline, Readable } from 'node:stream';
|
||||
import { promisify } from 'node:util';
|
||||
import { bucketName, minioClient, minioResources } from '@/modules/minio.ts';
|
||||
import { Readable } from 'node:stream';
|
||||
import { minioResources } from '@/modules/s3.ts';
|
||||
import { oss } from '@/app.ts';
|
||||
import fs from 'node:fs';
|
||||
import { IncomingMessage, ServerResponse } from 'node:http';
|
||||
import http from 'node:http';
|
||||
@@ -11,14 +11,18 @@ import path from 'path';
|
||||
import { getTextContentType } from '@/modules/fm-manager/index.ts';
|
||||
import { logger } from '@/modules/logger.ts';
|
||||
import { pipeStream } from '../pipe.ts';
|
||||
const pipelineAsync = promisify(pipeline);
|
||||
import { GetObjectCommandOutput } from '@aws-sdk/client-s3';
|
||||
|
||||
export async function downloadFileFromMinio(fileUrl: string, destFile: string) {
|
||||
const objectName = fileUrl.replace(minioResources + '/', '');
|
||||
const objectStream = await minioClient.getObject(bucketName, objectName);
|
||||
const destStream = fs.createWriteStream(destFile);
|
||||
await pipelineAsync(objectStream, destStream);
|
||||
console.log(`minio File downloaded to ${minioResources}/${objectName} \n ${destFile}`);
|
||||
const objectStream = await oss.getObject(objectName) as GetObjectCommandOutput;
|
||||
const body = objectStream.Body as Readable;
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
for await (const chunk of body) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
fs.writeFileSync(destFile, Buffer.concat(chunks));
|
||||
}
|
||||
export const filterKeys = (metaData: Record<string, string>, clearKeys: string[] = []) => {
|
||||
const keys = Object.keys(metaData);
|
||||
@@ -43,8 +47,8 @@ export async function minioProxy(
|
||||
const { createNotFoundPage, isDownload = false } = opts;
|
||||
const objectName = fileUrl.replace(minioResources + '/', '');
|
||||
try {
|
||||
const stat = await minioClient.statObject(bucketName, objectName);
|
||||
if (stat.size === 0) {
|
||||
const stat = await oss.statObject(objectName);
|
||||
if (stat?.size === 0) {
|
||||
createNotFoundPage('Invalid proxy url');
|
||||
return true;
|
||||
}
|
||||
@@ -54,7 +58,8 @@ export async function minioProxy(
|
||||
const lastModified = stat.lastModified.toISOString();
|
||||
const fileName = objectName.split('/').pop();
|
||||
const ext = path.extname(fileName || '');
|
||||
const objectStream = await minioClient.getObject(bucketName, objectName);
|
||||
const objectStreamOutput = await oss.getObject(objectName);
|
||||
const objectStream = objectStreamOutput.Body as Readable;
|
||||
const headers = {
|
||||
'Content-Length': contentLength,
|
||||
etag,
|
||||
@@ -151,6 +156,7 @@ export const httpProxy = async (
|
||||
return createNotFoundPage('Invalid proxy url:' + error.message);
|
||||
}
|
||||
} else {
|
||||
console.log('Proxying file: headers', headers);
|
||||
res.writeHead(proxyRes.statusCode, {
|
||||
...headers,
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import http from 'node:http';
|
||||
import { minioClient } from '@/modules/minio.ts';
|
||||
import { pipeMinioStream } from '../pipe.ts';
|
||||
import { oss } from '@/app.ts';
|
||||
import { Readable } from 'node:stream';
|
||||
type ProxyInfo = {
|
||||
path?: string;
|
||||
target: string;
|
||||
@@ -15,9 +16,8 @@ export const minioProxyOrigin = async (req: http.IncomingMessage, res: http.Serv
|
||||
if (objectName.startsWith(bucketName)) {
|
||||
objectName = objectName.slice(bucketName.length);
|
||||
}
|
||||
const objectStream = await minioClient.getObject(bucketName, objectName);
|
||||
// objectStream.pipe(res);
|
||||
pipeMinioStream(objectStream, res);
|
||||
const objectStream = await oss.getObject(objectName);
|
||||
pipeMinioStream(objectStream.Body as Readable, res);
|
||||
} catch (error) {
|
||||
console.error('Error fetching object from MinIO:', error);
|
||||
res.statusCode = 500;
|
||||
|
||||
@@ -1,38 +1,13 @@
|
||||
import { Client, ClientOptions } from 'minio';
|
||||
import { Client, } from 'minio';
|
||||
import { useConfig } from '@kevisual/use-config';
|
||||
const config = useConfig();
|
||||
import { OssBase } from '@kevisual/oss/services';
|
||||
const minioConfig = {
|
||||
endPoint: config.MINIO_ENDPOINT || 'localhost',
|
||||
// @ts-ignore
|
||||
port: parseInt(config.MINIO_PORT || '9000'),
|
||||
useSSL: config.MINIO_USE_SSL === 'true',
|
||||
accessKey: config.MINIO_ACCESS_KEY,
|
||||
secretKey: config.MINIO_SECRET_KEY,
|
||||
};
|
||||
const { port, endPoint, useSSL } = minioConfig;
|
||||
// console.log('minioConfig', minioConfig);
|
||||
export const minioClient = new Client(minioConfig);
|
||||
export const bucketName = config.MINIO_BUCKET_NAME || 'resources';
|
||||
|
||||
export const minioUrl = `http${useSSL ? 's' : ''}://${endPoint}:${port || 9000}`;
|
||||
export const minioResources = `${minioUrl}/resources`;
|
||||
|
||||
if (!minioClient) {
|
||||
throw new Error('Minio client not initialized');
|
||||
}
|
||||
// 验证权限
|
||||
(async () => {
|
||||
const bucketExists = await minioClient.bucketExists(bucketName);
|
||||
if (!bucketExists) {
|
||||
await minioClient.makeBucket(bucketName);
|
||||
}
|
||||
console.log('bucketExists', bucketExists);
|
||||
// const res = await minioClient.putObject(bucketName, 'root/test/0.0.1/a.txt', 'test');
|
||||
// console.log('minio putObject', res);
|
||||
})();
|
||||
|
||||
export const oss = new OssBase({
|
||||
client: minioClient,
|
||||
bucketName: bucketName,
|
||||
prefix: '',
|
||||
});
|
||||
|
||||
44
src/modules/s3.ts
Normal file
44
src/modules/s3.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { CreateBucketCommand, HeadObjectCommand, S3Client, } from '@aws-sdk/client-s3';
|
||||
import { OssBase } from '@kevisual/oss/s3.ts';
|
||||
import { useConfig } from '@kevisual/use-config';
|
||||
const config = useConfig();
|
||||
|
||||
export const bucketName = config.S3_BUCKET_NAME || 'resources';
|
||||
|
||||
export const s3Client = new S3Client({
|
||||
credentials: {
|
||||
accessKeyId: config.S3_ACCESS_KEY_ID || '',
|
||||
secretAccessKey: config.S3_SECRET_ACCESS_KEY || '',
|
||||
},
|
||||
region: config.S3_REGION,
|
||||
endpoint: config.S3_ENDPOINT,
|
||||
// minio配置
|
||||
forcePathStyle: true,
|
||||
});
|
||||
|
||||
// 判断 bucketName 是否存在,不存在则创建
|
||||
(async () => {
|
||||
try {
|
||||
await s3Client.send(new HeadObjectCommand({ Bucket: bucketName, Key: '' }));
|
||||
console.log(`Bucket ${bucketName} exists.`);
|
||||
} catch (error) {
|
||||
console.log(`Bucket ${bucketName} does not exist. Creating...`);
|
||||
if (config.S3_ENDPOINT?.includes?.('9000')) {
|
||||
// 创建 bucket
|
||||
await s3Client.send(new CreateBucketCommand({ Bucket: bucketName }));
|
||||
console.log(`Bucket ${bucketName} created.`);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
if (!s3Client) {
|
||||
throw new Error('S3 client not initialized');
|
||||
}
|
||||
|
||||
export const oss = new OssBase({
|
||||
client: s3Client,
|
||||
bucketName: bucketName,
|
||||
prefix: '',
|
||||
})
|
||||
|
||||
export const minioResources = `${config.S3_ENDPOINT}/${bucketName}`;
|
||||
@@ -7,7 +7,7 @@ import { nanoid } from 'nanoid';
|
||||
import { pipeline } from 'stream';
|
||||
import { promisify } from 'util';
|
||||
import { getAppLoadStatus, setAppLoadStatus } from './get-app-status.ts';
|
||||
import { minioResources } from '../minio.ts';
|
||||
import { minioResources } from '../s3.ts';
|
||||
import { downloadFileFromMinio, fetchApp, fetchDomain, fetchTest } from '@/modules/fm-manager/index.ts';
|
||||
import { logger } from '../logger.ts';
|
||||
export * from './get-app-status.ts';
|
||||
|
||||
Reference in New Issue
Block a user