refactor: migrate WebSocket proxy to v1-ws-proxy module
- Updated import paths to use the new v1-ws-proxy module. - Removed the old ws-proxy module and its associated files. - Implemented new WebSocket proxy logic in the v1-ws-proxy module. - Adjusted UserV1Proxy to utilize the new WebSocket proxy manager and methods.
This commit is contained in:
135
src/modules/v1-ws-proxy/manager.ts
Normal file
135
src/modules/v1-ws-proxy/manager.ts
Normal file
@@ -0,0 +1,135 @@
|
||||
import { nanoid } from 'nanoid';
|
||||
import { WebSocket } from 'ws';
|
||||
import { logger } from '../logger.ts';
|
||||
import { EventEmitter } from 'eventemitter3';
|
||||
|
||||
class WsMessage {
|
||||
ws: WebSocket;
|
||||
user?: string;
|
||||
emitter: EventEmitter;
|
||||
private pingTimer?: NodeJS.Timeout;
|
||||
private readonly PING_INTERVAL = 30000; // 30 秒发送一次 ping
|
||||
|
||||
constructor({ ws, user }: WssMessageOptions) {
|
||||
this.ws = ws;
|
||||
this.user = user;
|
||||
this.emitter = new EventEmitter();
|
||||
this.startPing();
|
||||
}
|
||||
|
||||
private startPing() {
|
||||
this.stopPing();
|
||||
this.pingTimer = setInterval(() => {
|
||||
if (this.ws.readyState === WebSocket.OPEN) {
|
||||
this.ws.ping();
|
||||
} else {
|
||||
this.stopPing();
|
||||
}
|
||||
}, this.PING_INTERVAL);
|
||||
}
|
||||
|
||||
private stopPing() {
|
||||
if (this.pingTimer) {
|
||||
clearInterval(this.pingTimer);
|
||||
this.pingTimer = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.stopPing();
|
||||
this.emitter.removeAllListeners();
|
||||
}
|
||||
|
||||
async sendResponse(data: any) {
|
||||
if (data.id) {
|
||||
this.emitter.emit(data.id, data?.data);
|
||||
}
|
||||
}
|
||||
async sendData(data: any, context?: any, opts?: { timeout?: number }) {
|
||||
if (this.ws.readyState !== WebSocket.OPEN) {
|
||||
return { code: 500, message: 'WebSocket is not open' };
|
||||
}
|
||||
const timeout = opts?.timeout || 10 * 6 * 1000; // 10 minutes
|
||||
const id = nanoid();
|
||||
const message = JSON.stringify({
|
||||
id,
|
||||
type: 'proxy',
|
||||
data: {
|
||||
message: data,
|
||||
context: context || {},
|
||||
},
|
||||
});
|
||||
logger.info('ws-proxy sendData', message);
|
||||
this.ws.send(message);
|
||||
const msg = { path: data?.path, key: data?.key, id: data?.id };
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
resolve({
|
||||
code: 500,
|
||||
message: `运行超时,执行的id: ${id},参数是${JSON.stringify(msg)}`,
|
||||
});
|
||||
}, timeout);
|
||||
this.emitter.once(id, (data: any) => {
|
||||
resolve(data);
|
||||
clearTimeout(timer);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
type WssMessageOptions = {
|
||||
ws: WebSocket;
|
||||
user?: string;
|
||||
};
|
||||
export class WsProxyManager {
|
||||
wssMap: Map<string, WsMessage> = new Map();
|
||||
PING_INTERVAL = 30000; // 30 秒检查一次连接状态
|
||||
constructor(opts?: { pingInterval?: number }) {
|
||||
if (opts?.pingInterval) {
|
||||
this.PING_INTERVAL = opts.pingInterval;
|
||||
}
|
||||
this.checkConnceted();
|
||||
}
|
||||
register(id: string, opts?: { ws: WebSocket; user: string }) {
|
||||
if (this.wssMap.has(id)) {
|
||||
const value = this.wssMap.get(id);
|
||||
if (value) {
|
||||
value.ws.close();
|
||||
value.destroy();
|
||||
}
|
||||
}
|
||||
const [username, appId] = id.split('--');
|
||||
const url = new URL(`/${username}/v1/${appId}`, 'https://kevisual.cn/');
|
||||
console.log('WsProxyManager register', id, '访问地址', url.toString());
|
||||
const value = new WsMessage({ ws: opts?.ws, user: opts?.user });
|
||||
this.wssMap.set(id, value);
|
||||
}
|
||||
unregister(id: string) {
|
||||
const value = this.wssMap.get(id);
|
||||
if (value) {
|
||||
value.ws.close();
|
||||
value.destroy();
|
||||
}
|
||||
this.wssMap.delete(id);
|
||||
}
|
||||
getIds(beginWith?: string) {
|
||||
if (beginWith) {
|
||||
return Array.from(this.wssMap.keys()).filter(key => key.startsWith(beginWith));
|
||||
}
|
||||
return Array.from(this.wssMap.keys());
|
||||
}
|
||||
get(id: string) {
|
||||
return this.wssMap.get(id);
|
||||
}
|
||||
checkConnceted() {
|
||||
const that = this;
|
||||
setTimeout(() => {
|
||||
that.wssMap.forEach((value, key) => {
|
||||
if (value.ws.readyState !== WebSocket.OPEN) {
|
||||
logger.debug('ws not connected, unregister', key);
|
||||
that.unregister(key);
|
||||
}
|
||||
});
|
||||
that.checkConnceted();
|
||||
}, this.PING_INTERVAL);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user