diff --git a/package.json b/package.json index e7b3b00..eebd071 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "$schema": "https://json.schemastore.org/package", "name": "@kevisual/router", - "version": "0.0.43", + "version": "0.0.47", "description": "", "type": "module", "main": "./dist/router.js", diff --git a/src/app.ts b/src/app.ts index 4a58137..363a320 100644 --- a/src/app.ts +++ b/src/app.ts @@ -134,7 +134,7 @@ export class App { } this.server.on({ id: 'app-request-listener', - fun: fn + fun: fn as any, }); } } diff --git a/src/index.ts b/src/index.ts index 15e9d95..562c8d2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,4 +20,15 @@ export { App } from './app.ts'; export * from './router-define.ts'; -export { RouterReq, RouterRes } from './server/server-type.ts'; \ No newline at end of file +export { + RouterReq, + RouterRes, + OnWebSocketFn, + WS, + WebSocketReq, + WebSocketRes, + Listener, + WebSocketListenerFun, + HttpListenerFun, + OnListener, +} from './server/server-type.ts'; \ No newline at end of file diff --git a/src/server/server-base.ts b/src/server/server-base.ts index ff3269d..a70ad5e 100644 --- a/src/server/server-base.ts +++ b/src/server/server-base.ts @@ -1,9 +1,9 @@ import type { IncomingMessage, ServerResponse } from 'node:http'; import { handleServer } from './handle-server.ts'; import * as cookie from './cookie.ts'; -import { ServerType, Listener, OnListener, ServerOpts } from './server-type.ts'; +import { ServerType, Listener, OnListener, ServerOpts, OnWebSocketOptions, OnWebSocketFn, WebScoketListenerFun, ListenerFun, HttpListenerFun, WS } from './server-type.ts'; import { parseIfJson } from '../utils/parse.ts'; - +import { EventEmitter } from 'events'; type CookieFn = (name: string, value: string, options?: cookie.SerializeOptions, end?: boolean) => void; export type HandleCtx = { @@ -63,6 +63,7 @@ export class ServerBase implements ServerType { _callback: any; cors: Cors; listeners: Listener[] = []; + emitter = new EventEmitter(); constructor(opts?: ServerOpts) { this.path = opts?.path || '/api/router'; this.handle = opts?.handle; @@ -118,9 +119,9 @@ export class ServerBase implements ServerType { } const listeners = that.listeners || []; for (const item of listeners) { - const fun = item.fun; - if (typeof fun === 'function' && !item.io) { - await fun(req, res); + const func = item.func as any; + if (typeof func === 'function' && !item.io) { + await func(req, res); } } if (res.headersSent) { @@ -178,13 +179,13 @@ export class ServerBase implements ServerType { on(listener: OnListener) { this.listeners = []; if (typeof listener === 'function') { - this.listeners.push({ fun: listener }); + this.listeners.push({ func: listener }); return; } if (Array.isArray(listener)) { for (const item of listener) { if (typeof item === 'function') { - this.listeners.push({ fun: item }); + this.listeners.push({ func: item }); } else { this.listeners.push(item); } @@ -193,7 +194,7 @@ export class ServerBase implements ServerType { this.listeners.push(listener); } } - async onWebSocket({ ws, message, pathname, token, id }) { + async onWebSocket({ ws, message, pathname, token, id }: OnWebSocketOptions) { const listener = this.listeners.find((item) => item.path === pathname && item.io); const data: any = parseIfJson(message); @@ -201,7 +202,8 @@ export class ServerBase implements ServerType { const end = (data: any) => { ws.send(JSON.stringify(data)); } - listener.fun({ + (listener.func as WebScoketListenerFun)({ + emitter: this.emitter, data, token, id, @@ -262,4 +264,15 @@ export class ServerBase implements ServerType { end({ code: 500, message: `${type} server is error` }); } } + async onWsClose(ws: WS) { + const id = ws?.data?.id || ''; + if (id) { + this.emitter.emit('close--' + id, { type: 'close', ws, id }); + setTimeout(() => { + // 关闭后 5 秒清理监听器, 避免内存泄漏, 理论上原本的自己就应该被清理掉了,这里是保险起见 + this.emitter.removeAllListeners('close--' + id); + this.emitter.removeAllListeners(id); + }, 5000); + } + } } diff --git a/src/server/server-bun.ts b/src/server/server-bun.ts index 7e62384..dcf8b8f 100644 --- a/src/server/server-bun.ts +++ b/src/server/server-bun.ts @@ -223,7 +223,7 @@ export class BunServer extends ServerBase implements ServerType { }, websocket: { open: (ws: any) => { - ws.send('connected'); + ws.send(JSON.stringify({ type: 'connected' })); }, message: async (ws: any, message: string | Buffer) => { const pathname = ws.data.pathname || ''; @@ -233,6 +233,7 @@ export class BunServer extends ServerBase implements ServerType { }, close: (ws: any) => { // WebSocket 连接关闭 + this.onWsClose(ws); }, }, }); diff --git a/src/server/server-type.ts b/src/server/server-type.ts index 2e4cd8a..4a660de 100644 --- a/src/server/server-type.ts +++ b/src/server/server-type.ts @@ -1,13 +1,7 @@ -import * as http from 'http'; +import EventEmitter from 'node:events'; +import * as http from 'node:http'; + -export type Listener = { - id?: string; - io?: boolean; - path?: string; - fun: (...args: any[]) => Promise | void; -} -export type ListenerFun = (...args: any[]) => Promise | void; -export type OnListener = Listener | ListenerFun | (Listener | ListenerFun)[]; export type Cors = { /** * @default '*'' @@ -44,14 +38,49 @@ export interface ServerType { * @param listener */ on(listener: OnListener): void; - onWebSocket({ ws, message, pathname, token, id }: { ws: WS; message: string | Buffer; pathname: string, token?: string, id?: string }): void; + onWebSocket({ ws, message, pathname, token, id }: OnWebSocketOptions): void; + onWsClose(ws: WS): void; } -type WS = { +export type OnWebSocketOptions = { ws: WS; message: string | Buffer; pathname: string, token?: string, id?: string } +export type OnWebSocketFn = (options: OnWebSocketOptions) => Promise | void; +export type WS = { send: (data: any) => void; - close: () => void; + close: (code?: number, reason?: string) => void; + data?: { + url: URL; + pathname: string; + token?: string; + id?: string; + /** + * 鉴权后的获取的信息 + */ + userApp?: string; + } +} +export type Listener = { + id?: string; + io?: boolean; + path?: string; + func: WebSocketListenerFun | HttpListenerFun; } +export type WebSocketListenerFun = (req: WebSocketReq, res: WebSocketRes) => Promise | void; +export type HttpListenerFun = (req: RouterReq, res: RouterRes) => Promise | void; + +export type WebSocketReq = { + emitter?: EventEmitter; + ws: WS; + data: any; + pathname?: string; + token?: string; + id?: string; +} +export type WebSocketRes = { + end: (data: any) => void; +} +export type ListenerFun = WebSocketListenerFun | HttpListenerFun;; +export type OnListener = Listener | ListenerFun | (Listener | ListenerFun)[]; export type RouterReq = { url: string; method: string; diff --git a/src/server/ws-server.ts b/src/server/ws-server.ts index 4542404..be41f5e 100644 --- a/src/server/ws-server.ts +++ b/src/server/ws-server.ts @@ -24,7 +24,6 @@ export type Listener = { export class WsServerBase { wss: WebSocketServer | null; - listeners: Listener[] = []; listening: boolean = false; server: ServerType; @@ -51,11 +50,22 @@ export class WsServerBase { const pathname = url.pathname; const token = url.searchParams.get('token') || ''; const id = url.searchParams.get('id') || ''; + // @ts-ignore + ws.data = { + url: url, + pathname, + token, + id, + } ws.on('message', async (message: string | Buffer) => { await this.server.onWebSocket({ ws, message, pathname, token, id }); }); - ws.send('connected'); + ws.send(JSON.stringify({ type: 'connected' })); + this.wss.on('close', () => { + this.server.onWsClose(ws); + }); }); + } } // TODO: ws handle and path and routerContext