diff --git a/src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts b/src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts index 22b3adb..6508247 100644 --- a/src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts +++ b/src/modules/fm-manager/proxy/ai-proxy-chunk/post-proxy.ts @@ -1,9 +1,17 @@ import { IncomingMessage, ServerResponse } from 'http'; import busboy from 'busboy'; +import fs from 'fs'; +import path from 'path'; +import { createWriteStream } from 'fs'; import { parseSearchValue } from '@kevisual/router/src/server/parse-body.ts'; import { pipeBusboy } from '../../pipe-busboy.ts'; import { ProxyOptions, getMetadata, getObjectName } from '../ai-proxy.ts'; +import { fileIsExist, useFileStore } from '@kevisual/use-config'; +import { getContentType } from '../../get-content-type.ts'; + +const cacheFilePath = useFileStore('cache-file', { needExists: true }); + export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => { const _u = new URL(req.url, 'http://localhost'); @@ -85,5 +93,147 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: end({ error: err }, '文件解析失败', 500); }); + pipeBusboy(req, res, bb); +}; + + +export const postChunkProxy = async (req: IncomingMessage, res: ServerResponse, opts: ProxyOptions) => { + const oss = opts.oss; + const { objectName, isOwner } = await getObjectName(req); + if (!isOwner) { + return opts?.createNotFoundPage?.('no permission'); + } + const _u = new URL(req.url, 'http://localhost'); + const params = _u.searchParams; + const meta = parseSearchValue(params.get('meta'), { decode: true }); + const end = (data: any, message?: string, code = 200) => { + res.writeHead(code, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ code: code, data: data, message: message || 'success' })); + }; + const bb = busboy({ + headers: req.headers, + limits: { + fileSize: 100 * 1024 * 1024, // 100MB + files: 1, + }, + defCharset: 'utf-8', + }); + const fields: any = {}; + let filePromise: Promise | null = null; + let tempPath = ''; + let file: any = null; + + bb.on('field', (fieldname, value) => { + fields[fieldname] = value; + }); + + bb.on('file', (_fieldname, fileStream, info) => { + const { filename, mimeType } = info; + const decodedFilename = typeof filename === 'string' ? Buffer.from(filename, 'latin1').toString('utf8') : filename; + tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`); + const writeStream = createWriteStream(tempPath); + + filePromise = new Promise((resolve, reject) => { + fileStream.pipe(writeStream); + writeStream.on('finish', () => { + file = { + filepath: tempPath, + originalFilename: decodedFilename, + mimetype: mimeType, + }; + resolve(); + }); + writeStream.on('error', reject); + }); + }); + + bb.on('finish', async () => { + if (filePromise) { + try { + await filePromise; + } catch (err) { + console.error(`File write error: ${err.message}`); + return end({ error: err }, '文件写入失败', 500); + } + } + const clearFiles = () => { + if (tempPath && fs.existsSync(tempPath)) { + fs.unlinkSync(tempPath); + } + }; + + if (!file) { + clearFiles(); + return end({ success: false }, '没有接收到文件', 400); + } + + let { chunkIndex, totalChunks } = fields; + chunkIndex = parseInt(chunkIndex, 10); + totalChunks = parseInt(totalChunks, 10); + + if (isNaN(chunkIndex) || isNaN(totalChunks) || totalChunks <= 0) { + clearFiles(); + return end({ success: false }, 'chunkIndex 和 totalChunks 参数无效', 400); + } + + const finalFilePath = path.join(cacheFilePath, `chunk-${objectName.replace(/[\/\.]/g, '-')}`); + const relativePath = file.originalFilename; + const writeStream = fs.createWriteStream(finalFilePath, { flags: 'a' }); + const readStream = fs.createReadStream(tempPath); + readStream.pipe(writeStream); + + writeStream.on('finish', async () => { + fs.unlinkSync(tempPath); + if (chunkIndex + 1 === totalChunks) { + try { + const metadata = { + ...getMetadata(relativePath), + ...meta, + 'app-source': 'user-app', + }; + if (fileIsExist(finalFilePath)) { + console.log('上传到对象存储', { objectName, metadata }); + const content = fs.readFileSync(finalFilePath); + await oss.putObject(objectName, content, metadata); + console.log('上传到对象存储完成', { objectName }); + fs.unlinkSync(finalFilePath); + } + return end({ + success: true, + name: relativePath, + chunkIndex, + totalChunks, + objectName, + }, '分块上传完成', 200); + } catch (error) { + console.error('postChunkProxy upload error', error); + clearFiles(); + if (fs.existsSync(finalFilePath)) { + fs.unlinkSync(finalFilePath); + } + return end({ error }, '上传失败', 500); + } + } else { + return end({ + success: true, + chunkIndex, + totalChunks, + progress: ((chunkIndex + 1) / totalChunks) * 100, + }, '分块上传成功', 200); + } + }); + + writeStream.on('error', (err) => { + console.error('Write stream error', err); + clearFiles(); + return end({ error: err }, '文件写入失败', 500); + }); + }); + + bb.on('error', (err) => { + console.error('Busboy 错误:', err); + end({ error: err }, '文件解析失败', 500); + }); + pipeBusboy(req, res, bb); }; \ No newline at end of file diff --git a/src/modules/fm-manager/proxy/ai-proxy.ts b/src/modules/fm-manager/proxy/ai-proxy.ts index ce28c37..4cec84d 100644 --- a/src/modules/fm-manager/proxy/ai-proxy.ts +++ b/src/modules/fm-manager/proxy/ai-proxy.ts @@ -12,7 +12,7 @@ import { logger } from '@/modules/logger.ts'; import { pipeBusboy } from '../pipe-busboy.ts'; import { pipeMinioStream } from '../pipe.ts'; import { Readable } from 'stream'; -import { postProxy } from './ai-proxy-chunk/post-proxy.ts' +import { postChunkProxy, postProxy } from './ai-proxy-chunk/post-proxy.ts' type FileList = { name: string; prefix?: string; @@ -190,7 +190,7 @@ export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?: let loginUser: Awaited> = null; if (checkOwner) { loginUser = await getLoginUser(req); - logger.debug('getObjectName', loginUser, user, app); + logger.debug('getObjectName', user, app); isOwner = loginUser?.tokenUser?.username === owner; } return { @@ -304,6 +304,12 @@ export const aiProxy = async (req: IncomingMessage, res: ServerResponse, opts: P opts.oss = oss; } if (req.method === 'POST') { + const searchParams = new URL(req.url || '', 'http://localhost').searchParams; + const chunk = searchParams.get('chunk'); + const chunked = searchParams.get('chunked'); + if (chunk !== null || chunked !== null) { + return postChunkProxy(req, res, opts); + } return postProxy(req, res, opts); } if (req.method === 'DELETE') {