优化文件流处理,添加对 Bun 环境的支持,重构管道传输函数,改进内容类型获取逻辑

This commit is contained in:
2025-12-21 14:53:22 +08:00
parent d1e619d04c
commit 18a7c15a76
8 changed files with 160 additions and 17 deletions

View File

@@ -1,5 +1,9 @@
import path from 'node:path'; import path from 'node:path';
export const getTextContentType = (ext: string) => { export const getTextContentType = (filePath: string, isFilePath = false) => {
let ext = filePath;
if (isFilePath) {
ext = path.extname(filePath).toLowerCase();
}
const textContentTypes = [ const textContentTypes = [
'.tsx', '.tsx',
'.jsx', // '.jsx', //
@@ -12,17 +16,21 @@ export const getTextContentType = (ext: string) => {
'.json5', '.json5',
'.pem', '.pem',
'.crt', '.crt',
'.yaml',
'.yml',
]; ];
const include = textContentTypes.includes(ext); const include = textContentTypes.includes(ext);
if (!include) { if (include) {
return {}; return {
'content-type': 'text/plain; charset=utf-8',
};
} }
const contentType = getContentTypeCore(ext); const contentType = getContentTypeCore(ext);
if (!contentType) { if (!contentType) {
return {}; return {};
} }
return { return {
'Content-Type': contentType, 'content-type': contentType,
}; };
}; };
// 获取文件的 content-type // 获取文件的 content-type
@@ -76,8 +84,8 @@ export const getContentTypeCore = (extname: string) => {
'.wasm': 'application/wasm', // WebAssembly 文件 '.wasm': 'application/wasm', // WebAssembly 文件
'.pem': 'application/x-pem-file', // PEM 证书文件 '.pem': 'application/x-pem-file', // PEM 证书文件
'.crt': 'application/x-x509-ca-cert', // CRT 证书文件 '.crt': 'application/x-x509-ca-cert', // CRT 证书文件
'.yaml': 'application/x-yaml; charset=utf-8', // YAML 文件 '.yaml': 'application/yaml; charset=utf-8', // YAML 文件
'.yml': 'application/x-yaml; charset=utf-8', // YAML 文件(别名) '.yml': 'application/yaml; charset=utf-8', // YAML 文件(别名)
'.zip': 'application/octet-stream', '.zip': 'application/octet-stream',
}; };
return contentType[extname]; return contentType[extname];

View File

@@ -9,6 +9,6 @@ export * from './get-content-type.ts'
export * from './utils.ts' export * from './utils.ts'
export { pipeFileStream, pipeStream } from './pipe.ts' export * from './pipe.ts'
export { pipeBusboy } from './pipe-busboy.ts' export { pipeBusboy } from './pipe-busboy.ts'

View File

@@ -1,19 +1,138 @@
import * as http from 'http'; import * as http from 'http';
import * as fs from 'fs'; import * as fs from 'fs';
import { isBun } from '../../utils/get-engine.ts'; import { isBun } from './utils.ts';
import Stream from 'stream';
/**
* 文件流管道传输函数
* 将指定文件的内容通过流的方式传输给客户端响应
* @param filePath 要传输的文件路径
* @param res HTTP服务器响应对象
*/
export const pipeFileStream = (filePath: string, res: http.ServerResponse) => { export const pipeFileStream = (filePath: string, res: http.ServerResponse) => {
const readStream = fs.createReadStream(filePath); const readStream = fs.createReadStream(filePath);
if (isBun) { if (isBun) {
// Bun环境下的流处理方式
res.pipe(readStream as any); res.pipe(readStream as any);
} else { } else {
// Node.js标准环境下的流处理方式end:true表示在流结束时自动关闭响应
readStream.pipe(res, { end: true }); readStream.pipe(res, { end: true });
} }
} }
/**
* 通用流管道传输函数
* 将可读流的数据传输给客户端响应
* @param readStream 可读流对象
* @param res HTTP服务器响应对象
*/
export const pipeStream = (readStream: fs.ReadStream, res: http.ServerResponse) => { export const pipeStream = (readStream: fs.ReadStream, res: http.ServerResponse) => {
if (isBun) { if (isBun) {
// Bun环境下的流处理方式
res.pipe(readStream as any); res.pipe(readStream as any);
} else { } else {
// Node.js标准环境下的流处理方式
readStream.pipe(res, { end: true }); readStream.pipe(res, { end: true });
} }
} }
export const pipeMinioStream = (minioStream: Stream.Readable, res: http.ServerResponse) => {
if (isBun) {
const chunks: Buffer[] = [];
// 监听数据到达事件,收集所有数据块
minioStream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
// 监听数据结束事件,将收集的数据合并并发送给客户端
minioStream.on('end', () => {
const result = Buffer.concat(chunks);
res.end(result);
});
// 监听错误事件,处理代理响应过程中的错误
minioStream.on('error', (error) => {
res.writeHead(500);
res.end(JSON.stringify({ error: error.message }));
});
} else {
minioStream.pipe(res, { end: true });
}
}
/**
* 代理响应流传输函数
* 将代理服务器返回的响应数据传输给客户端
* 处理从目标服务器收到的响应流并转发给原始客户端
* @param proxyRes 代理服务器的响应对象
* @param res HTTP服务器响应对象
*/
export const pipeProxyRes = (proxyRes: http.IncomingMessage, res: http.ServerResponse) => {
if (isBun) {
// Bun环境下需要手动收集数据并end因为Bun的pipe机制与Node.js不同
const chunks: Buffer[] = [];
// 监听数据到达事件,收集所有数据块
proxyRes.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
if (proxyRes.url === '/api/router') {
console.log(proxyRes.url, proxyRes.statusCode);
}
// 监听数据结束事件,将收集的数据合并并发送给客户端
proxyRes.on('end', () => {
const result = Buffer.concat(chunks).toString();
res.end(result);
});
// 监听错误事件,处理代理响应过程中的错误
proxyRes.on('error', (error) => {
res.writeHead(500);
res.end(JSON.stringify({ error: error.message }));
});
} else {
// Node.js标准环境下直接使用pipe进行流传输
proxyRes.pipe(res, { end: true });
}
}
/**
* 代理请求流传输函数
* 将客户端的请求数据传输给代理服务器
* 处理来自客户端的请求流并转发给目标服务器
* @param req 客户端的请求对象
* @param proxyReq 代理服务器的请求对象
*/
export const pipeProxyReq = async (req: http.IncomingMessage, proxyReq: http.ClientRequest, res: any) => {
if (isBun) {
try {
// @ts-ignore
const bunRequest = req.bun.request;
const contentType = req.headers['content-type'] || '';
if (contentType.includes('multipart/form-data')) {
console.log('Processing multipart/form-data');
const arrayBuffer = await bunRequest.arrayBuffer();
// 设置请求头(在写入数据之前)
proxyReq.setHeader('content-type', contentType);
proxyReq.setHeader('content-length', arrayBuffer.byteLength.toString());
// 写入数据并结束请求
if (arrayBuffer.byteLength > 0) {
proxyReq.write(Buffer.from(arrayBuffer));
}
proxyReq.end();
return;
}
console.log('Bun pipeProxyReq content-type', contentType);
// @ts-ignore
const bodyString = req.body;
bodyString && proxyReq.write(bodyString);
proxyReq.end();
} catch (error) {
proxyReq.destroy(error);
}
} else {
// Node.js标准环境下直接使用pipe进行流传输
req.pipe(proxyReq, { end: true });
}
}

View File

@@ -5,10 +5,12 @@ import { getUserFromRequest } from '../utils.ts';
import { UserPermission, Permission } from '@kevisual/permission'; import { UserPermission, Permission } from '@kevisual/permission';
import { getLoginUser } from '@/modules/auth.ts'; import { getLoginUser } from '@/modules/auth.ts';
import busboy from 'busboy'; import busboy from 'busboy';
import { getContentType } from '../get-content-type.ts'; import { getContentType, getTextContentType } from '../get-content-type.ts';
import { OssBase } from '@kevisual/oss'; import { OssBase } from '@kevisual/oss';
import { parseSearchValue } from '@kevisual/router/browser'; import { parseSearchValue } from '@kevisual/router/browser';
import { logger } from '@/modules/logger.ts'; import { logger } from '@/modules/logger.ts';
import { pipeBusboy } from '../pipe-busboy.ts';
import { pipeMinioStream } from '../pipe.ts';
type FileList = { type FileList = {
name: string; name: string;
@@ -116,12 +118,14 @@ const getAiProxy = async (req: IncomingMessage, res: ServerResponse, opts: Proxy
etag, etag,
'last-modified': lastModified, 'last-modified': lastModified,
...filterMetaData, ...filterMetaData,
...getTextContentType(objectName, true),
}; };
res.writeHead(200, { res.writeHead(200, {
...headers, ...headers,
}); });
objectStream.pipe(res, { end: true }); // objectStream.pipe(res, { end: true });
// @ts-ignore
pipeMinioStream(objectStream, res);
return true; return true;
} catch (error) { } catch (error) {
console.error(`Proxy request error: ${error.message}`); console.error(`Proxy request error: ${error.message}`);
@@ -226,7 +230,7 @@ export const postProxy = async (req: IncomingMessage, res: ServerResponse, opts:
end({ error: err }, '文件解析失败', 500); end({ error: err }, '文件解析失败', 500);
}); });
req.pipe(bb); pipeBusboy(req, res, bb);
}; };
export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?: boolean }) => { export const getObjectName = async (req: IncomingMessage, opts?: { checkOwner?: boolean }) => {
const _u = new URL(req.url, 'http://localhost'); const _u = new URL(req.url, 'http://localhost');

View File

@@ -1,5 +1,6 @@
import http from 'node:http'; import http from 'node:http';
import { minioClient } from '@/modules/minio.ts'; import { minioClient } from '@/modules/minio.ts';
import { pipeMinioStream } from '../pipe.ts';
type ProxyInfo = { type ProxyInfo = {
path?: string; path?: string;
target: string; target: string;
@@ -15,7 +16,8 @@ export const minioProxyOrigin = async (req: http.IncomingMessage, res: http.Serv
objectName = objectName.slice(bucketName.length); objectName = objectName.slice(bucketName.length);
} }
const objectStream = await minioClient.getObject(bucketName, objectName); const objectStream = await minioClient.getObject(bucketName, objectName);
objectStream.pipe(res); // objectStream.pipe(res);
pipeMinioStream(objectStream, res);
} catch (error) { } catch (error) {
console.error('Error fetching object from MinIO:', error); console.error('Error fetching object from MinIO:', error);
res.statusCode = 500; res.statusCode = 500;

View File

@@ -1,6 +1,12 @@
import { IncomingMessage } from 'node:http'; import { IncomingMessage } from 'node:http';
import http from 'node:http'; import http from 'node:http';
import { logger } from '../logger.ts'; import { logger } from '../logger.ts';
export const isBun = typeof Bun !== 'undefined' && Bun?.version != null;
export const isNode = typeof process !== 'undefined' && process?.versions != null && process.versions?.node != null;
// @ts-ignore
export const isDeno = typeof Deno !== 'undefined' && Deno?.version != null && Deno?.version?.deno != null;
export const getUserFromRequest = (req: IncomingMessage) => { export const getUserFromRequest = (req: IncomingMessage) => {
const url = new URL(req.url, `http://${req.headers.host}`); const url = new URL(req.url, `http://${req.headers.host}`);

View File

@@ -8,6 +8,7 @@ import { bucketName } from '@/modules/minio.ts';
import { getLoginUser } from '../middleware/auth.ts'; import { getLoginUser } from '../middleware/auth.ts';
import { BucketItemStat } from 'minio'; import { BucketItemStat } from 'minio';
import { UserPermission, Permission } from '@kevisual/permission'; import { UserPermission, Permission } from '@kevisual/permission';
import { pipeMinioStream } from '@/modules/fm-manager/index.ts';
/** /**
* 过滤 metaData 中的 key, 去除 password, accesskey, secretkey * 过滤 metaData 中的 key, 去除 password, accesskey, secretkey
@@ -101,5 +102,6 @@ export const authMinio = async (req: IncomingMessage, res: ServerResponse, objec
}); });
const objectStream = await minioClient.getObject(bucketName, objectName); const objectStream = await minioClient.getObject(bucketName, objectName);
objectStream.pipe(res, { end: true }); // objectStream.pipe(res, { end: true });
pipeMinioStream(objectStream, res);
}; };

View File

@@ -1,4 +1,4 @@
import { getDNS, isIpv4OrIpv6, isLocalhost, pipeFileStream } from '../modules/fm-manager/index.ts'; import { getDNS, isIpv4OrIpv6, isLocalhost, pipeFileStream, pipeProxyReq, pipeProxyRes } from '../modules/fm-manager/index.ts';
import http from 'node:http'; import http from 'node:http';
import https from 'node:https'; import https from 'node:https';
import { UserApp } from '../modules/user-app/index.ts'; import { UserApp } from '../modules/user-app/index.ts';
@@ -110,7 +110,8 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR
// 将代理服务器的响应头和状态码返回给客户端 // 将代理服务器的响应头和状态码返回给客户端
res.writeHead(proxyRes.statusCode, proxyRes.headers); res.writeHead(proxyRes.statusCode, proxyRes.headers);
// 将代理响应流写入客户端响应 // 将代理响应流写入客户端响应
proxyRes.pipe(res, { end: true }); // proxyRes.pipe(res, { end: true });
pipeProxyRes(proxyRes, res);
}); });
// 处理代理请求的错误事件 // 处理代理请求的错误事件
proxyReq.on('error', (err) => { proxyReq.on('error', (err) => {
@@ -119,7 +120,8 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR
res.write(`Proxy request error: ${err.message}`); res.write(`Proxy request error: ${err.message}`);
}); });
// 处理 POST 请求的请求体(传递数据到目标服务器) // 处理 POST 请求的请求体(传递数据到目标服务器)
req.pipe(proxyReq, { end: true }); // req.pipe(proxyReq, { end: true });
pipeProxyReq(req, proxyReq, res);
return; return;
} }
if (req.url.startsWith('/api') || req.url.startsWith('/v1')) { if (req.url.startsWith('/api') || req.url.startsWith('/v1')) {