Compare commits
2 Commits
82c9b834e9
...
337abd2bc3
| Author | SHA1 | Date | |
|---|---|---|---|
| 337abd2bc3 | |||
| 32167faf67 |
239
src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts
Normal file
239
src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts
Normal file
@@ -0,0 +1,239 @@
|
||||
|
||||
import { IncomingMessage, ServerResponse } from 'http';
|
||||
import busboy from 'busboy';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { createWriteStream } from 'fs';
|
||||
import { parseSearchValue } from '@kevisual/router/src/server/parse-body.ts';
|
||||
import { pipeBusboy } from '../../pipe-busboy.ts';
|
||||
import { ProxyOptions, getMetadata, getObjectName } from '../ai-proxy.ts';
|
||||
import { fileIsExist, useFileStore } from '@kevisual/use-config';
|
||||
import { getContentType } from '../../get-content-type.ts';
|
||||
|
||||
const cacheFilePath = useFileStore('cache-file', { needExists: true });
|
||||
|
||||
export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
|
||||
const _u = new URL(req.url, 'http://localhost');
|
||||
|
||||
const pathname = _u.pathname;
|
||||
const oss = opts.oss;
|
||||
const params = _u.searchParams;
|
||||
const force = !!params.get('force');
|
||||
const hash = params.get('hash');
|
||||
const _fileSize: string = params.get('size');
|
||||
let fileSize: number | undefined = undefined;
|
||||
if (_fileSize) {
|
||||
fileSize = parseInt(_fileSize, 10);
|
||||
}
|
||||
console.log('postProxy', { hash, force, fileSize });
|
||||
let meta = parseSearchValue(params.get('meta'), { decode: true });
|
||||
if (!hash && !force) {
|
||||
return opts?.createNotFoundPage?.('no hash');
|
||||
}
|
||||
const { objectName, isOwner } = await getObjectName(req);
|
||||
if (!isOwner) {
|
||||
return opts?.createNotFoundPage?.('no permission');
|
||||
}
|
||||
const end = (data: any, message?: string, code = 200) => {
|
||||
res.writeHead(code, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ code: code, data: data, message: message || 'success' }));
|
||||
};
|
||||
let statMeta: any = {};
|
||||
if (!force) {
|
||||
const check = await oss.checkObjectHash(objectName, hash, meta);
|
||||
statMeta = check?.metaData || {};
|
||||
let isNewMeta = false;
|
||||
if (check.success && JSON.stringify(meta) !== '{}' && !check.equalMeta) {
|
||||
meta = { ...statMeta, ...getMetadata(pathname), ...meta };
|
||||
isNewMeta = true;
|
||||
await oss.replaceObject(objectName, { ...meta });
|
||||
}
|
||||
if (check.success) {
|
||||
return end({ success: true, hash, meta, isNewMeta, equalMeta: check.equalMeta }, '文件已存在');
|
||||
}
|
||||
}
|
||||
const bb = busboy({
|
||||
headers: req.headers,
|
||||
limits: {
|
||||
fileSize: 100 * 1024 * 1024, // 100MB
|
||||
files: 1,
|
||||
},
|
||||
defCharset: 'utf-8',
|
||||
});
|
||||
let fileProcessed = false;
|
||||
bb.on('file', async (name, file, info) => {
|
||||
fileProcessed = true;
|
||||
try {
|
||||
await oss.putObject(
|
||||
objectName,
|
||||
file,
|
||||
{
|
||||
...statMeta,
|
||||
...getMetadata(pathname),
|
||||
...meta,
|
||||
},
|
||||
{ check: false, isStream: true },
|
||||
);
|
||||
end({ success: true, name, info, isNew: true, hash, meta: meta?.metaData, statMeta }, '上传成功', 200);
|
||||
|
||||
} catch (error) {
|
||||
console.log('postProxy upload error', error);
|
||||
end({ error: error }, '上传失败', 500);
|
||||
}
|
||||
});
|
||||
|
||||
bb.on('finish', () => {
|
||||
// 只有当没有文件被处理时才执行end
|
||||
if (!fileProcessed) {
|
||||
end({ success: false }, '没有接收到文件', 400);
|
||||
}
|
||||
});
|
||||
bb.on('error', (err) => {
|
||||
console.error('Busboy 错误:', err);
|
||||
end({ error: err }, '文件解析失败', 500);
|
||||
});
|
||||
|
||||
pipeBusboy(req, res, bb);
|
||||
};
|
||||
|
||||
|
||||
export const postChunkProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
|
||||
const oss = opts.oss;
|
||||
const { objectName, isOwner } = await getObjectName(req);
|
||||
if (!isOwner) {
|
||||
return opts?.createNotFoundPage?.('no permission');
|
||||
}
|
||||
const _u = new URL(req.url, 'http://localhost');
|
||||
const params = _u.searchParams;
|
||||
const meta = parseSearchValue(params.get('meta'), { decode: true });
|
||||
const end = (data: any, message?: string, code = 200) => {
|
||||
res.writeHead(code, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ code: code, data: data, message: message || 'success' }));
|
||||
};
|
||||
const bb = busboy({
|
||||
headers: req.headers,
|
||||
limits: {
|
||||
fileSize: 100 * 1024 * 1024, // 100MB
|
||||
files: 1,
|
||||
},
|
||||
defCharset: 'utf-8',
|
||||
});
|
||||
const fields: any = {};
|
||||
let filePromise: Promise<void> | null = null;
|
||||
let tempPath = '';
|
||||
let file: any = null;
|
||||
|
||||
bb.on('field', (fieldname, value) => {
|
||||
fields[fieldname] = value;
|
||||
});
|
||||
|
||||
bb.on('file', (_fieldname, fileStream, info) => {
|
||||
const { filename, mimeType } = info;
|
||||
const decodedFilename = typeof filename === 'string' ? Buffer.from(filename, 'latin1').toString('utf8') : filename;
|
||||
tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`);
|
||||
const writeStream = createWriteStream(tempPath);
|
||||
|
||||
filePromise = new Promise<void>((resolve, reject) => {
|
||||
fileStream.pipe(writeStream);
|
||||
writeStream.on('finish', () => {
|
||||
file = {
|
||||
filepath: tempPath,
|
||||
originalFilename: decodedFilename,
|
||||
mimetype: mimeType,
|
||||
};
|
||||
resolve();
|
||||
});
|
||||
writeStream.on('error', reject);
|
||||
});
|
||||
});
|
||||
|
||||
bb.on('finish', async () => {
|
||||
if (filePromise) {
|
||||
try {
|
||||
await filePromise;
|
||||
} catch (err) {
|
||||
console.error(`File write error: ${err.message}`);
|
||||
return end({ error: err }, '文件写入失败', 500);
|
||||
}
|
||||
}
|
||||
const clearFiles = () => {
|
||||
if (tempPath && fs.existsSync(tempPath)) {
|
||||
fs.unlinkSync(tempPath);
|
||||
}
|
||||
};
|
||||
|
||||
if (!file) {
|
||||
clearFiles();
|
||||
return end({ success: false }, '没有接收到文件', 400);
|
||||
}
|
||||
|
||||
let { chunkIndex, totalChunks } = fields;
|
||||
chunkIndex = parseInt(chunkIndex, 10);
|
||||
totalChunks = parseInt(totalChunks, 10);
|
||||
|
||||
if (isNaN(chunkIndex) || isNaN(totalChunks) || totalChunks <= 0) {
|
||||
clearFiles();
|
||||
return end({ success: false }, 'chunkIndex 和 totalChunks 参数无效', 400);
|
||||
}
|
||||
|
||||
const finalFilePath = path.join(cacheFilePath, `chunk-${objectName.replace(/[\/\.]/g, '-')}`);
|
||||
const relativePath = file.originalFilename;
|
||||
const writeStream = fs.createWriteStream(finalFilePath, { flags: 'a' });
|
||||
const readStream = fs.createReadStream(tempPath);
|
||||
readStream.pipe(writeStream);
|
||||
|
||||
writeStream.on('finish', async () => {
|
||||
fs.unlinkSync(tempPath);
|
||||
if (chunkIndex + 1 === totalChunks) {
|
||||
try {
|
||||
const metadata = {
|
||||
...getMetadata(relativePath),
|
||||
...meta,
|
||||
'app-source': 'user-app',
|
||||
};
|
||||
if (fileIsExist(finalFilePath)) {
|
||||
console.log('上传到对象存储', { objectName, metadata });
|
||||
const content = fs.readFileSync(finalFilePath);
|
||||
await oss.putObject(objectName, content, metadata);
|
||||
console.log('上传到对象存储完成', { objectName });
|
||||
fs.unlinkSync(finalFilePath);
|
||||
}
|
||||
return end({
|
||||
success: true,
|
||||
name: relativePath,
|
||||
chunkIndex,
|
||||
totalChunks,
|
||||
objectName,
|
||||
}, '分块上传完成', 200);
|
||||
} catch (error) {
|
||||
console.error('postChunkProxy upload error', error);
|
||||
clearFiles();
|
||||
if (fs.existsSync(finalFilePath)) {
|
||||
fs.unlinkSync(finalFilePath);
|
||||
}
|
||||
return end({ error }, '上传失败', 500);
|
||||
}
|
||||
} else {
|
||||
return end({
|
||||
success: true,
|
||||
chunkIndex,
|
||||
totalChunks,
|
||||
progress: ((chunkIndex + 1) / totalChunks) * 100,
|
||||
}, '分块上传成功', 200);
|
||||
}
|
||||
});
|
||||
|
||||
writeStream.on('error', (err) => {
|
||||
console.error('Write stream error', err);
|
||||
clearFiles();
|
||||
return end({ error: err }, '文件写入失败', 500);
|
||||
});
|
||||
});
|
||||
|
||||
bb.on('error', (err) => {
|
||||
console.error('Busboy 错误:', err);
|
||||
end({ error: err }, '文件解析失败', 500);
|
||||
});
|
||||
|
||||
pipeBusboy(req, res, bb);
|
||||
};
|
||||
@@ -12,7 +12,7 @@ import { logger } from '@/modules/logger.ts';
|
||||
import { pipeBusboy } from '../pipe-busboy.ts';
|
||||
import { pipeMinioStream } from '../pipe.ts';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { postChunkProxy, postProxy } from './ai-proxy-chunk/post-proxy.ts'
|
||||
type FileList = {
|
||||
name: string;
|
||||
prefix?: string;
|
||||
@@ -153,88 +153,6 @@ export const getMetadata = (pathname: string) => {
|
||||
return meta;
|
||||
};
|
||||
|
||||
export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
|
||||
const _u = new URL(req.url, 'http://localhost');
|
||||
|
||||
const pathname = _u.pathname;
|
||||
const oss = opts.oss;
|
||||
const params = _u.searchParams;
|
||||
const force = !!params.get('force');
|
||||
const hash = params.get('hash');
|
||||
const _fileSize: string = params.get('size');
|
||||
let fileSize: number | undefined = undefined;
|
||||
if (_fileSize) {
|
||||
fileSize = parseInt(_fileSize, 10);
|
||||
}
|
||||
let meta = parseSearchValue(params.get('meta'), { decode: true });
|
||||
if (!hash && !force) {
|
||||
return opts?.createNotFoundPage?.('no hash');
|
||||
}
|
||||
const { objectName, isOwner } = await getObjectName(req);
|
||||
if (!isOwner) {
|
||||
return opts?.createNotFoundPage?.('no permission');
|
||||
}
|
||||
const end = (data: any, message?: string, code = 200) => {
|
||||
res.writeHead(code, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ code: code, data: data, message: message || 'success' }));
|
||||
};
|
||||
let statMeta: any = {};
|
||||
if (!force) {
|
||||
const check = await oss.checkObjectHash(objectName, hash, meta);
|
||||
statMeta = check?.metaData || {};
|
||||
let isNewMeta = false;
|
||||
if (check.success && JSON.stringify(meta) !== '{}' && !check.equalMeta) {
|
||||
meta = { ...statMeta, ...getMetadata(pathname), ...meta };
|
||||
isNewMeta = true;
|
||||
await oss.replaceObject(objectName, { ...meta });
|
||||
}
|
||||
if (check.success) {
|
||||
return end({ success: true, hash, meta, isNewMeta, equalMeta: check.equalMeta }, '文件已存在');
|
||||
}
|
||||
}
|
||||
const bb = busboy({
|
||||
headers: req.headers,
|
||||
limits: {
|
||||
fileSize: 100 * 1024 * 1024, // 100MB
|
||||
files: 1,
|
||||
},
|
||||
defCharset: 'utf-8',
|
||||
});
|
||||
let fileProcessed = false;
|
||||
bb.on('file', async (name, file, info) => {
|
||||
fileProcessed = true;
|
||||
try {
|
||||
await oss.putObject(
|
||||
objectName,
|
||||
file,
|
||||
{
|
||||
...statMeta,
|
||||
...getMetadata(pathname),
|
||||
...meta,
|
||||
},
|
||||
{ check: false, isStream: true },
|
||||
);
|
||||
end({ success: true, name, info, isNew: true, hash, meta: meta?.metaData, statMeta }, '上传成功', 200);
|
||||
|
||||
} catch (error) {
|
||||
console.log('postProxy upload error', error);
|
||||
end({ error: error }, '上传失败', 500);
|
||||
}
|
||||
});
|
||||
|
||||
bb.on('finish', () => {
|
||||
// 只有当没有文件被处理时才执行end
|
||||
if (!fileProcessed) {
|
||||
end({ success: false }, '没有接收到文件', 400);
|
||||
}
|
||||
});
|
||||
bb.on('error', (err) => {
|
||||
console.error('Busboy 错误:', err);
|
||||
end({ error: err }, '文件解析失败', 500);
|
||||
});
|
||||
|
||||
pipeBusboy(req, res, bb);
|
||||
};
|
||||
export const getObjectByPathname = (opts: {
|
||||
pathname: string,
|
||||
version?: string,
|
||||
@@ -272,7 +190,7 @@ export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?:
|
||||
let loginUser: Awaited<ReturnType<typeof getLoginUser>> = null;
|
||||
if (checkOwner) {
|
||||
loginUser = await getLoginUser(req);
|
||||
logger.debug('getObjectName', loginUser, user, app);
|
||||
logger.debug('getObjectName', user, app);
|
||||
isOwner = loginUser?.tokenUser?.username === owner;
|
||||
}
|
||||
return {
|
||||
@@ -377,7 +295,7 @@ export const renameProxy = async (req: IncomingMessage, res: ServerResponse, opt
|
||||
}
|
||||
};
|
||||
|
||||
type ProxyOptions = {
|
||||
export type ProxyOptions = {
|
||||
createNotFoundPage: (msg?: string) => any;
|
||||
oss?: OssBase;
|
||||
};
|
||||
@@ -386,6 +304,12 @@ export const aiProxy = async (req: IncomingMessage, res: ServerResponse, opts: P
|
||||
opts.oss = oss;
|
||||
}
|
||||
if (req.method === 'POST') {
|
||||
const searchParams = new URL(req.url || '', 'http://localhost').searchParams;
|
||||
const chunk = searchParams.get('chunk');
|
||||
const chunked = searchParams.get('chunked');
|
||||
if (chunk !== null || chunked !== null) {
|
||||
return postChunkProxy(req, res, opts);
|
||||
}
|
||||
return postProxy(req, res, opts);
|
||||
}
|
||||
if (req.method === 'DELETE') {
|
||||
|
||||
Reference in New Issue
Block a user