test bun stream sse and http-stream
This commit is contained in:
@@ -50,7 +50,7 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
port,
|
||||
hostname,
|
||||
idleTimeout: 0, // 4 minutes idle timeout (max 255 seconds)
|
||||
fetch: async (request: Bun.BunRequest, server: any) => {
|
||||
fetch: async (request: Bun.BunRequest, server: Bun.Server<{}>) => {
|
||||
const host = request.headers.get('host') || 'localhost';
|
||||
const clientInfo = server.requestIP(request); // 返回 { address: string, port: number } 或 null
|
||||
const url = new URL(request.url, `http://${host}`);
|
||||
@@ -72,6 +72,7 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
|
||||
// 将 Bun 的 Request 转换为 Node.js 风格的 req/res
|
||||
return new Promise(async (resolve) => {
|
||||
const reqListener: { event: string; listener: Function }[] = [];
|
||||
const req: RouterReq = {
|
||||
url: url.pathname + url.search,
|
||||
method: request.method,
|
||||
@@ -81,12 +82,29 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
remoteAddress: request?.remoteAddress || request?.ip || clientInfo?.address || '',
|
||||
remotePort: clientInfo?.port || 0,
|
||||
},
|
||||
// @ts-ignore
|
||||
on: (event: string, listener: Function) => {
|
||||
reqListener.push({ event, listener });
|
||||
},
|
||||
bun: {
|
||||
request, // 原始请求对象
|
||||
server, // 原始服务器对象
|
||||
resolve
|
||||
}
|
||||
};
|
||||
const onClose = () => {
|
||||
reqListener.forEach(item => {
|
||||
if (item.event === 'close') {
|
||||
item.listener();
|
||||
}
|
||||
});
|
||||
reqListener.length = 0;
|
||||
}
|
||||
// 监听请求的取消事件
|
||||
if (request.signal) {
|
||||
request.signal.addEventListener('abort', () => {
|
||||
onClose();
|
||||
});
|
||||
}
|
||||
|
||||
const res: RouterRes = {
|
||||
statusCode: 200,
|
||||
@@ -143,7 +161,7 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
if (callback) callback();
|
||||
return true;
|
||||
},
|
||||
pipe(stream: any) {
|
||||
pipe(stream: ReadableStream | NodeJS.ReadableStream) {
|
||||
this.writableEnded = true;
|
||||
|
||||
// 如果是 ReadableStream,直接使用
|
||||
@@ -164,6 +182,7 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
controller.enqueue(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
onClose();
|
||||
controller.close();
|
||||
});
|
||||
stream.on('error', (err: any) => {
|
||||
@@ -171,9 +190,9 @@ export class BunServer extends ServerBase implements ServerType {
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
if (stream.destroy) {
|
||||
stream.destroy();
|
||||
}
|
||||
// 只有NODE流才有destroy方法
|
||||
// @ts-ignore
|
||||
stream?.destroy?.();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -104,6 +104,12 @@ export type RouterReq<T = {}> = {
|
||||
};
|
||||
body?: string;
|
||||
cookies?: Record<string, string>;
|
||||
bun?: {
|
||||
request: Bun.BunRequest;
|
||||
server: Bun.Server<{}>;
|
||||
resolve: (response: Response) => void;
|
||||
}
|
||||
on: (event: 'close', listener: Function) => void;
|
||||
} & T;
|
||||
|
||||
export type RouterRes<T = {}> = {
|
||||
@@ -116,6 +122,6 @@ export type RouterRes<T = {}> = {
|
||||
setHeader: (name: string, value: string | string[]) => void;
|
||||
cookie: (name: string, value: string, options?: any) => void;
|
||||
write: (chunk: any) => void;
|
||||
pipe: (stream: any) => void;
|
||||
pipe: (stream: ReadableStream) => void;
|
||||
end: (data?: any) => void;
|
||||
} & T;
|
||||
Reference in New Issue
Block a user