This commit is contained in:
2026-02-01 16:04:03 +08:00
parent 32167faf67
commit 337abd2bc3
2 changed files with 158 additions and 2 deletions

View File

@@ -1,9 +1,17 @@
import { IncomingMessage, ServerResponse } from 'http';
import busboy from 'busboy';
import fs from 'fs';
import path from 'path';
import { createWriteStream } from 'fs';
import { parseSearchValue } from '@kevisual/router/src/server/parse-body.ts';
import { pipeBusboy } from '../../pipe-busboy.ts';
import { ProxyOptions, getMetadata, getObjectName } from '../ai-proxy.ts';
import { fileIsExist, useFileStore } from '@kevisual/use-config';
import { getContentType } from '../../get-content-type.ts';
const cacheFilePath = useFileStore('cache-file', { needExists: true });
export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
const _u = new URL(req.url, 'http://localhost');
@@ -87,3 +95,145 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts:
pipeBusboy(req, res, bb);
};
export const postChunkProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => {
const oss = opts.oss;
const { objectName, isOwner } = await getObjectName(req);
if (!isOwner) {
return opts?.createNotFoundPage?.('no permission');
}
const _u = new URL(req.url, 'http://localhost');
const params = _u.searchParams;
const meta = parseSearchValue(params.get('meta'), { decode: true });
const end = (data: any, message?: string, code = 200) => {
res.writeHead(code, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ code: code, data: data, message: message || 'success' }));
};
const bb = busboy({
headers: req.headers,
limits: {
fileSize: 100 * 1024 * 1024, // 100MB
files: 1,
},
defCharset: 'utf-8',
});
const fields: any = {};
let filePromise: Promise<void> | null = null;
let tempPath = '';
let file: any = null;
bb.on('field', (fieldname, value) => {
fields[fieldname] = value;
});
bb.on('file', (_fieldname, fileStream, info) => {
const { filename, mimeType } = info;
const decodedFilename = typeof filename === 'string' ? Buffer.from(filename, 'latin1').toString('utf8') : filename;
tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`);
const writeStream = createWriteStream(tempPath);
filePromise = new Promise<void>((resolve, reject) => {
fileStream.pipe(writeStream);
writeStream.on('finish', () => {
file = {
filepath: tempPath,
originalFilename: decodedFilename,
mimetype: mimeType,
};
resolve();
});
writeStream.on('error', reject);
});
});
bb.on('finish', async () => {
if (filePromise) {
try {
await filePromise;
} catch (err) {
console.error(`File write error: ${err.message}`);
return end({ error: err }, '文件写入失败', 500);
}
}
const clearFiles = () => {
if (tempPath && fs.existsSync(tempPath)) {
fs.unlinkSync(tempPath);
}
};
if (!file) {
clearFiles();
return end({ success: false }, '没有接收到文件', 400);
}
let { chunkIndex, totalChunks } = fields;
chunkIndex = parseInt(chunkIndex, 10);
totalChunks = parseInt(totalChunks, 10);
if (isNaN(chunkIndex) || isNaN(totalChunks) || totalChunks <= 0) {
clearFiles();
return end({ success: false }, 'chunkIndex 和 totalChunks 参数无效', 400);
}
const finalFilePath = path.join(cacheFilePath, `chunk-${objectName.replace(/[\/\.]/g, '-')}`);
const relativePath = file.originalFilename;
const writeStream = fs.createWriteStream(finalFilePath, { flags: 'a' });
const readStream = fs.createReadStream(tempPath);
readStream.pipe(writeStream);
writeStream.on('finish', async () => {
fs.unlinkSync(tempPath);
if (chunkIndex + 1 === totalChunks) {
try {
const metadata = {
...getMetadata(relativePath),
...meta,
'app-source': 'user-app',
};
if (fileIsExist(finalFilePath)) {
console.log('上传到对象存储', { objectName, metadata });
const content = fs.readFileSync(finalFilePath);
await oss.putObject(objectName, content, metadata);
console.log('上传到对象存储完成', { objectName });
fs.unlinkSync(finalFilePath);
}
return end({
success: true,
name: relativePath,
chunkIndex,
totalChunks,
objectName,
}, '分块上传完成', 200);
} catch (error) {
console.error('postChunkProxy upload error', error);
clearFiles();
if (fs.existsSync(finalFilePath)) {
fs.unlinkSync(finalFilePath);
}
return end({ error }, '上传失败', 500);
}
} else {
return end({
success: true,
chunkIndex,
totalChunks,
progress: ((chunkIndex + 1) / totalChunks) * 100,
}, '分块上传成功', 200);
}
});
writeStream.on('error', (err) => {
console.error('Write stream error', err);
clearFiles();
return end({ error: err }, '文件写入失败', 500);
});
});
bb.on('error', (err) => {
console.error('Busboy 错误:', err);
end({ error: err }, '文件解析失败', 500);
});
pipeBusboy(req, res, bb);
};

View File

@@ -12,7 +12,7 @@ import { logger } from '@/modules/logger.ts';
import { pipeBusboy } from '../pipe-busboy.ts';
import { pipeMinioStream } from '../pipe.ts';
import { Readable } from 'stream';
import { postProxy } from './ai-proxy-chunk/post-proxy.ts'
import { postChunkProxy, postProxy } from './ai-proxy-chunk/post-proxy.ts'
type FileList = {
name: string;
prefix?: string;
@@ -190,7 +190,7 @@ export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?:
let loginUser: Awaited<ReturnType<typeof getLoginUser>> = null;
if (checkOwner) {
loginUser = await getLoginUser(req);
logger.debug('getObjectName', loginUser, user, app);
logger.debug('getObjectName', user, app);
isOwner = loginUser?.tokenUser?.username === owner;
}
return {
@@ -304,6 +304,12 @@ export const aiProxy = async (req: IncomingMessage, res: ServerResponse, opts: P
opts.oss = oss;
}
if (req.method === 'POST') {
const searchParams = new URL(req.url || '', 'http://localhost').searchParams;
const chunk = searchParams.get('chunk');
const chunked = searchParams.get('chunked');
if (chunk !== null || chunked !== null) {
return postChunkProxy(req, res, opts);
}
return postProxy(req, res, opts);
}
if (req.method === 'DELETE') {