diff --git a/src/modules/fm-manager/get-content-type.ts b/src/modules/fm-manager/get-content-type.ts index 8d794b5..03c2e5b 100644 --- a/src/modules/fm-manager/get-content-type.ts +++ b/src/modules/fm-manager/get-content-type.ts @@ -1,5 +1,9 @@ import path from 'node:path'; -export const getTextContentType = (ext: string) => { +export const getTextContentType = (filePath: string, isFilePath = false) => { + let ext = filePath; + if (isFilePath) { + ext = path.extname(filePath).toLowerCase(); + } const textContentTypes = [ '.tsx', '.jsx', // @@ -12,17 +16,21 @@ export const getTextContentType = (ext: string) => { '.json5', '.pem', '.crt', + '.yaml', + '.yml', ]; const include = textContentTypes.includes(ext); - if (!include) { - return {}; + if (include) { + return { + 'content-type': 'text/plain; charset=utf-8', + }; } const contentType = getContentTypeCore(ext); if (!contentType) { return {}; } return { - 'Content-Type': contentType, + 'content-type': contentType, }; }; // 获取文件的 content-type @@ -76,8 +84,8 @@ export const getContentTypeCore = (extname: string) => { '.wasm': 'application/wasm', // WebAssembly 文件 '.pem': 'application/x-pem-file', // PEM 证书文件 '.crt': 'application/x-x509-ca-cert', // CRT 证书文件 - '.yaml': 'application/x-yaml; charset=utf-8', // YAML 文件 - '.yml': 'application/x-yaml; charset=utf-8', // YAML 文件(别名) + '.yaml': 'application/yaml; charset=utf-8', // YAML 文件 + '.yml': 'application/yaml; charset=utf-8', // YAML 文件(别名) '.zip': 'application/octet-stream', }; return contentType[extname]; diff --git a/src/modules/fm-manager/index.ts b/src/modules/fm-manager/index.ts index 8900245..d4601fa 100644 --- a/src/modules/fm-manager/index.ts +++ b/src/modules/fm-manager/index.ts @@ -9,6 +9,6 @@ export * from './get-content-type.ts' export * from './utils.ts' -export { pipeFileStream, pipeStream } from './pipe.ts' +export * from './pipe.ts' export { pipeBusboy } from './pipe-busboy.ts' \ No newline at end of file diff --git a/src/modules/fm-manager/pipe.ts b/src/modules/fm-manager/pipe.ts index 2afda41..bdbf65f 100644 --- a/src/modules/fm-manager/pipe.ts +++ b/src/modules/fm-manager/pipe.ts @@ -1,19 +1,138 @@ import * as http from 'http'; import * as fs from 'fs'; -import { isBun } from '../../utils/get-engine.ts'; +import { isBun } from './utils.ts'; +import Stream from 'stream'; +/** + * 文件流管道传输函数 + * 将指定文件的内容通过流的方式传输给客户端响应 + * @param filePath 要传输的文件路径 + * @param res HTTP服务器响应对象 + */ export const pipeFileStream = (filePath: string, res: http.ServerResponse) => { const readStream = fs.createReadStream(filePath); if (isBun) { + // Bun环境下的流处理方式 res.pipe(readStream as any); } else { + // Node.js标准环境下的流处理方式,end:true表示在流结束时自动关闭响应 readStream.pipe(res, { end: true }); } } + +/** + * 通用流管道传输函数 + * 将可读流的数据传输给客户端响应 + * @param readStream 可读流对象 + * @param res HTTP服务器响应对象 + */ export const pipeStream = (readStream: fs.ReadStream, res: http.ServerResponse) => { if (isBun) { + // Bun环境下的流处理方式 res.pipe(readStream as any); } else { + // Node.js标准环境下的流处理方式 readStream.pipe(res, { end: true }); } +} + +export const pipeMinioStream = (minioStream: Stream.Readable, res: http.ServerResponse) => { + if (isBun) { + const chunks: Buffer[] = []; + // 监听数据到达事件,收集所有数据块 + minioStream.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + // 监听数据结束事件,将收集的数据合并并发送给客户端 + minioStream.on('end', () => { + const result = Buffer.concat(chunks); + res.end(result); + }); + + // 监听错误事件,处理代理响应过程中的错误 + minioStream.on('error', (error) => { + res.writeHead(500); + res.end(JSON.stringify({ error: error.message })); + }); + } else { + minioStream.pipe(res, { end: true }); + } +} + +/** + * 代理响应流传输函数 + * 将代理服务器返回的响应数据传输给客户端 + * 处理从目标服务器收到的响应流并转发给原始客户端 + * @param proxyRes 代理服务器的响应对象 + * @param res HTTP服务器响应对象 + */ +export const pipeProxyRes = (proxyRes: http.IncomingMessage, res: http.ServerResponse) => { + if (isBun) { + // Bun环境下需要手动收集数据并end,因为Bun的pipe机制与Node.js不同 + const chunks: Buffer[] = []; + // 监听数据到达事件,收集所有数据块 + proxyRes.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + if (proxyRes.url === '/api/router') { + console.log(proxyRes.url, proxyRes.statusCode); + } + // 监听数据结束事件,将收集的数据合并并发送给客户端 + proxyRes.on('end', () => { + const result = Buffer.concat(chunks).toString(); + res.end(result); + }); + + // 监听错误事件,处理代理响应过程中的错误 + proxyRes.on('error', (error) => { + res.writeHead(500); + res.end(JSON.stringify({ error: error.message })); + }); + + } else { + // Node.js标准环境下直接使用pipe进行流传输 + proxyRes.pipe(res, { end: true }); + } +} + +/** + * 代理请求流传输函数 + * 将客户端的请求数据传输给代理服务器 + * 处理来自客户端的请求流并转发给目标服务器 + * @param req 客户端的请求对象 + * @param proxyReq 代理服务器的请求对象 + */ +export const pipeProxyReq = async (req: http.IncomingMessage, proxyReq: http.ClientRequest, res: any) => { + if (isBun) { + try { + // @ts-ignore + const bunRequest = req.bun.request; + const contentType = req.headers['content-type'] || ''; + if (contentType.includes('multipart/form-data')) { + console.log('Processing multipart/form-data'); + const arrayBuffer = await bunRequest.arrayBuffer(); + + // 设置请求头(在写入数据之前) + proxyReq.setHeader('content-type', contentType); + proxyReq.setHeader('content-length', arrayBuffer.byteLength.toString()); + + // 写入数据并结束请求 + if (arrayBuffer.byteLength > 0) { + proxyReq.write(Buffer.from(arrayBuffer)); + } + proxyReq.end(); + return; + } + console.log('Bun pipeProxyReq content-type', contentType); + // @ts-ignore + const bodyString = req.body; + bodyString && proxyReq.write(bodyString); + proxyReq.end(); + } catch (error) { + proxyReq.destroy(error); + } + } else { + // Node.js标准环境下直接使用pipe进行流传输 + req.pipe(proxyReq, { end: true }); + } } \ No newline at end of file diff --git a/src/modules/fm-manager/proxy/ai-proxy.ts b/src/modules/fm-manager/proxy/ai-proxy.ts index 5c514dc..8e40a53 100644 --- a/src/modules/fm-manager/proxy/ai-proxy.ts +++ b/src/modules/fm-manager/proxy/ai-proxy.ts @@ -5,10 +5,12 @@ import { getUserFromRequest } from '../utils.ts'; import { UserPermission, Permission } from '@kevisual/permission'; import { getLoginUser } from '@/modules/auth.ts'; import busboy from 'busboy'; -import { getContentType } from '../get-content-type.ts'; +import { getContentType, getTextContentType } from '../get-content-type.ts'; import { OssBase } from '@kevisual/oss'; import { parseSearchValue } from '@kevisual/router/browser'; import { logger } from '@/modules/logger.ts'; +import { pipeBusboy } from '../pipe-busboy.ts'; +import { pipeMinioStream } from '../pipe.ts'; type FileList = { name: string; @@ -116,12 +118,14 @@ const getAiProxy = async (req: IncomingMessage, res: ServerResponse, opts: Proxy etag, 'last-modified': lastModified, ...filterMetaData, + ...getTextContentType(objectName, true), }; - res.writeHead(200, { ...headers, }); - objectStream.pipe(res, { end: true }); + // objectStream.pipe(res, { end: true }); + // @ts-ignore + pipeMinioStream(objectStream, res); return true; } catch (error) { console.error(`Proxy request error: ${error.message}`); @@ -226,7 +230,7 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts: end({ error: err }, '文件解析失败', 500); }); - req.pipe(bb); + pipeBusboy(req, res, bb); }; export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?: boolean }) => { const _u = new URL(req.url, 'http://localhost'); diff --git a/src/modules/fm-manager/proxy/minio-proxy.ts b/src/modules/fm-manager/proxy/minio-proxy.ts index bcca6a7..015a894 100644 --- a/src/modules/fm-manager/proxy/minio-proxy.ts +++ b/src/modules/fm-manager/proxy/minio-proxy.ts @@ -1,5 +1,6 @@ import http from 'node:http'; import { minioClient } from '@/modules/minio.ts'; +import { pipeMinioStream } from '../pipe.ts'; type ProxyInfo = { path?: string; target: string; @@ -15,7 +16,8 @@ export const minioProxyOrigin = async (req: http.IncomingMessage, res: http.Serv objectName = objectName.slice(bucketName.length); } const objectStream = await minioClient.getObject(bucketName, objectName); - objectStream.pipe(res); + // objectStream.pipe(res); + pipeMinioStream(objectStream, res); } catch (error) { console.error('Error fetching object from MinIO:', error); res.statusCode = 500; diff --git a/src/modules/fm-manager/utils.ts b/src/modules/fm-manager/utils.ts index f40b524..1d13628 100644 --- a/src/modules/fm-manager/utils.ts +++ b/src/modules/fm-manager/utils.ts @@ -1,6 +1,12 @@ import { IncomingMessage } from 'node:http'; import http from 'node:http'; import { logger } from '../logger.ts'; +export const isBun = typeof Bun !== 'undefined' && Bun?.version != null; + +export const isNode = typeof process !== 'undefined' && process?.versions != null && process.versions?.node != null; + +// @ts-ignore +export const isDeno = typeof Deno !== 'undefined' && Deno?.version != null && Deno?.version?.deno != null; export const getUserFromRequest = (req: IncomingMessage) => { const url = new URL(req.url, `http://${req.headers.host}`); diff --git a/src/routes-simple/minio/get-minio-resource.ts b/src/routes-simple/minio/get-minio-resource.ts index b82ca16..8ba855b 100644 --- a/src/routes-simple/minio/get-minio-resource.ts +++ b/src/routes-simple/minio/get-minio-resource.ts @@ -8,6 +8,7 @@ import { bucketName } from '@/modules/minio.ts'; import { getLoginUser } from '../middleware/auth.ts'; import { BucketItemStat } from 'minio'; import { UserPermission, Permission } from '@kevisual/permission'; +import { pipeMinioStream } from '@/modules/fm-manager/index.ts'; /** * 过滤 metaData 中的 key, 去除 password, accesskey, secretkey, @@ -101,5 +102,6 @@ export const authMinio = async (req: IncomingMessage, res: ServerResponse, objec }); const objectStream = await minioClient.getObject(bucketName, objectName); - objectStream.pipe(res, { end: true }); + // objectStream.pipe(res, { end: true }); + pipeMinioStream(objectStream, res); }; diff --git a/src/routes-simple/page-proxy.ts b/src/routes-simple/page-proxy.ts index 42d2de4..21a4ab6 100644 --- a/src/routes-simple/page-proxy.ts +++ b/src/routes-simple/page-proxy.ts @@ -1,4 +1,4 @@ -import { getDNS, isIpv4OrIpv6, isLocalhost, pipeFileStream } from '../modules/fm-manager/index.ts'; +import { getDNS, isIpv4OrIpv6, isLocalhost, pipeFileStream, pipeProxyReq, pipeProxyRes } from '../modules/fm-manager/index.ts'; import http from 'node:http'; import https from 'node:https'; import { UserApp } from '../modules/user-app/index.ts'; @@ -110,7 +110,8 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR // 将代理服务器的响应头和状态码返回给客户端 res.writeHead(proxyRes.statusCode, proxyRes.headers); // 将代理响应流写入客户端响应 - proxyRes.pipe(res, { end: true }); + // proxyRes.pipe(res, { end: true }); + pipeProxyRes(proxyRes, res); }); // 处理代理请求的错误事件 proxyReq.on('error', (err) => { @@ -119,7 +120,8 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR res.write(`Proxy request error: ${err.message}`); }); // 处理 POST 请求的请求体(传递数据到目标服务器) - req.pipe(proxyReq, { end: true }); + // req.pipe(proxyReq, { end: true }); + pipeProxyReq(req, proxyReq, res); return; } if (req.url.startsWith('/api') || req.url.startsWith('/v1')) {