Files
code-center/src/modules/fm-manager/pipe.ts
abearxiong 66a19139b7 feat: implement AI agent for flowme-life interactions
- Add agent-run module to handle AI interactions with tools and messages.
- Create routes for proxying requests to OpenAI and Anthropic APIs.
- Implement flowme-life chat route for user queries and task management.
- Add services for retrieving and updating life records in the database.
- Implement logic for fetching today's tasks and marking tasks as done with next execution time calculation.
- Introduce tests for flowme-life functionalities.
2026-03-11 01:44:29 +08:00

137 lines
4.4 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import * as http from 'http';
import * as fs from 'fs';
import { isBun } from './utils.ts';
import Stream from 'stream';
/**
* 文件流管道传输函数
* 将指定文件的内容通过流的方式传输给客户端响应
* @param filePath 要传输的文件路径
* @param res HTTP服务器响应对象
*/
export const pipeFileStream = (filePath: string, res: http.ServerResponse) => {
const readStream = fs.createReadStream(filePath);
if (isBun) {
// Bun环境下的流处理方式
res.pipe(readStream as any);
} else {
// Node.js标准环境下的流处理方式end:true表示在流结束时自动关闭响应
readStream.pipe(res, { end: true });
}
}
/**
* 通用流管道传输函数
* 将可读流的数据传输给客户端响应
* @param readStream 可读流对象
* @param res HTTP服务器响应对象
*/
export const pipeStream = (readStream: fs.ReadStream | Stream.Readable, res: http.ServerResponse) => {
if (isBun) {
// Bun环境下的流处理方式
res.pipe(readStream as any);
} else {
// Node.js标准环境下的流处理方式
readStream.pipe(res, { end: true });
}
}
export const pipeMinioStream = (minioStream: Stream.Readable, res: http.ServerResponse) => {
if (isBun) {
const chunks: Buffer[] = [];
// 监听数据到达事件,收集所有数据块
minioStream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
// 监听数据结束事件,将收集的数据合并并发送给客户端
minioStream.on('end', () => {
const result = Buffer.concat(chunks);
res.end(result);
});
// 监听错误事件,处理代理响应过程中的错误
minioStream.on('error', (error) => {
res.writeHead(500);
res.end(JSON.stringify({ error: error.message }));
});
} else {
minioStream.pipe(res, { end: true });
}
}
/**
* 代理响应流传输函数
* 将代理服务器返回的响应数据传输给客户端
* 处理从目标服务器收到的响应流并转发给原始客户端
* @param proxyRes 代理服务器的响应对象
* @param res HTTP服务器响应对象
*/
export const pipeProxyRes = (proxyRes: http.IncomingMessage, res: http.ServerResponse) => {
if (isBun) {
// Bun环境下需要手动收集数据并end因为Bun的pipe机制与Node.js不同
const chunks: Buffer[] = [];
// 监听数据到达事件,收集所有数据块
proxyRes.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
if (proxyRes.url === '/api/router') {
console.log(proxyRes.url, proxyRes.statusCode);
}
// 监听数据结束事件,将收集的数据合并并发送给客户端
proxyRes.on('end', () => {
const result = Buffer.concat(chunks).toString();
res.end(result);
});
// 监听错误事件,处理代理响应过程中的错误
proxyRes.on('error', (error) => {
res.writeHead(500);
res.end(JSON.stringify({ error: error.message }));
});
} else {
// Node.js标准环境下直接使用pipe进行流传输
proxyRes.pipe(res, { end: true });
}
}
/**
* 代理请求流传输函数
* 将客户端的请求数据传输给代理服务器
* 处理来自客户端的请求流并转发给目标服务器
* @param req 客户端的请求对象
* @param proxyReq 代理服务器的请求对象
*/
export const pipeProxyReq = async (req: http.IncomingMessage, proxyReq: http.ClientRequest, res: any) => {
if (isBun) {
try {
// @ts-ignore
const bunRequest = req.bun.request;
const contentType = req.headers['content-type'] || '';
if (contentType.includes('multipart/form-data')) {
// console.log('Processing multipart/form-data');
const arrayBuffer = await bunRequest.arrayBuffer();
// 设置请求头(在写入数据之前)
proxyReq.setHeader('content-type', contentType);
proxyReq.setHeader('content-length', arrayBuffer.byteLength.toString());
// 写入数据并结束请求
if (arrayBuffer.byteLength > 0) {
proxyReq.write(Buffer.from(arrayBuffer));
}
proxyReq.end();
return;
}
// @ts-ignore
const bodyString = req.body;
bodyString && proxyReq.write(bodyString);
proxyReq.end();
} catch (error) {
proxyReq.destroy(error);
}
} else {
// Node.js标准环境下直接使用pipe进行流传输
req.pipe(proxyReq, { end: true });
}
}