Files
code-center/src/modules/v1-ws-proxy/manager.ts

263 lines
7.7 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { customAlphabet } from 'nanoid';
import { WebSocket } from 'ws';
import { logger } from '../logger.ts';
import { EventEmitter } from 'eventemitter3';
const letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
const nanoid = customAlphabet(letters, 10);
class WsMessage {
ws: WebSocket;
user?: string;
emitter: EventEmitter;
private pingTimer?: NodeJS.Timeout;
private readonly PING_INTERVAL = 30000; // 30 秒发送一次 ping
id?: string;
status?: 'waiting' | 'connected' | 'closed';
manager: WsProxyManager;
constructor({ ws, user, id, isLogin, manager }: WssMessageOptions) {
this.ws = ws;
this.user = user;
this.id = id;
this.emitter = new EventEmitter();
this.manager = manager;
this.status = isLogin ? 'connected' : 'waiting';
this.startPing();
}
private startPing() {
this.stopPing();
this.pingTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.ping();
} else {
this.stopPing();
}
}, this.PING_INTERVAL);
}
private stopPing() {
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = undefined;
}
}
destroy() {
this.stopPing();
this.emitter.removeAllListeners();
}
isClosed() {
return this.ws.readyState === WebSocket.CLOSED;
}
async sendResponse(data: any) {
if (data.id) {
this.emitter.emit(data.id, data?.data);
}
}
async sendConnected() {
const id = this.id;
const user = this.user;
const data = { type: 'verified', user, id };
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
this.status = 'connected';
}
if (id.includes('-registry-')) {
const newId = id.split('-registry-')[0];
this.manager.changeId(id, newId);
const ws = this.ws;
// @ts-ignore
if (this.ws?.data) {
// @ts-ignore
this.ws.data.userApp = newId;
}
}
}
getInfo() {
const shortAppId = this.id ? this.id.split('--')[1] : '';
return {
user: this.user,
id: this.id,
status: this.status,
shortAppId,
pathname: this.id ? `/${this.user}/v1/${shortAppId}` : '',
};
}
async sendData(data: any, context?: any, opts?: { timeout?: number }) {
if (this.ws.readyState !== WebSocket.OPEN) {
return { code: 500, message: 'WebSocket is not open' };
}
const timeout = opts?.timeout || 10 * 6 * 1000; // 10 minutes
const id = nanoid();
const message = JSON.stringify({
id,
type: 'proxy',
data: {
message: data,
context: context || {},
},
});
logger.info('ws-proxy sendData', message);
this.ws.send(message);
const msg = { path: data?.path, key: data?.key, id: data?.id };
return new Promise((resolve) => {
const timer = setTimeout(() => {
console.log('ws-proxy sendData timeout', msg);
resolve({
code: 500,
message: `运行超时执行的id: ${id},参数是${JSON.stringify(msg)}`,
});
}, timeout);
this.emitter.once(id, (data: any) => {
resolve(data);
clearTimeout(timer);
});
});
}
}
type WssMessageOptions = {
ws: WebSocket;
user?: string;
id?: string;
realId?: string;
isLogin?: boolean;
manager: WsProxyManager;
};
export class WsProxyManager {
wssMap: Map<string, WsMessage> = new Map();
PING_INTERVAL = 30000; // 30 秒检查一次连接状态
constructor(opts?: { pingInterval?: number }) {
if (opts?.pingInterval) {
this.PING_INTERVAL = opts.pingInterval;
}
this.checkConnceted();
}
register(id: string, opts?: { ws: WebSocket; user: string, id?: string, isLogin: boolean }) {
if (this.wssMap.has(id)) {
const value = this.wssMap.get(id);
if (value) {
value.ws.close();
value.destroy();
}
}
const [username, appId] = id.split('--');
const url = new URL(`/${username}/v1/${appId}`, 'https://kevisual.cn/');
console.log('WsProxyManager register', id, '访问地址', url.toString());
const value = new WsMessage({ ...opts, manager: this } as WssMessageOptions);
this.wssMap.set(id, value);
return value;
}
changeId(oldId: string, newId: string) {
const value = this.wssMap.get(oldId);
const originalValue = this.wssMap.get(newId);
if (originalValue) {
logger.debug(`WsProxyManager changeId: ${newId} already exists, close old connection`);
originalValue.ws.close();
originalValue.destroy();
}
if (value) {
this.wssMap.delete(oldId);
this.wssMap.set(newId, value);
value.id = newId;
// @ts-ignore
if (value.ws?.data) {
// @ts-ignore
value.ws.data.userApp = newId;
}
logger.debug(`WsProxyManager changeId: ${oldId} -> ${newId}`);
}
}
unregister(id: string) {
const value = this.wssMap.get(id);
if (value) {
value.ws.close();
value.destroy();
}
this.wssMap.delete(id);
}
getIds(beginWith?: string) {
if (beginWith) {
return Array.from(this.wssMap.keys()).filter(key => key.startsWith(beginWith));
}
return Array.from(this.wssMap.keys());
}
getIdsInfo(beginWith?: string) {
const ids = this.getIds(beginWith);
const infoList = this.getInfoList(ids);
return {
ids,
infoList,
};
}
getInfoList(ids: string[]) {
return ids.map(id => {
const value = this.wssMap.get(id);
if (value) {
return value.getInfo();
}
return null;
}).filter(Boolean);
}
get(id: string) {
return this.wssMap.get(id);
}
createId(id: string) {
if (!this.wssMap.has(id)) {
return id;
}
const newId = id + '-' + nanoid(6);
return newId;
}
checkConnceted() {
const that = this;
setTimeout(() => {
that.wssMap.forEach((value, key) => {
if (value.ws.readyState !== WebSocket.OPEN) {
logger.debug('ws not connected, unregister', key);
that.unregister(key);
}
});
that.checkConnceted();
}, this.PING_INTERVAL);
}
async createNewConnection(opts: { ws: any; user: string, userApp: string, isLogin?: boolean }) {
let id = opts.userApp;
let realId: string = id;
const isLogin = opts.isLogin || false;
const has = this.wssMap.has(id);
let registryId = '-registry-' + generateRegistryId(); // 生成一个随机六位字符串作为注册 ID
let isNeedVerify = !isLogin;
if (has) {
const value = this.wssMap.get(id);
if (value) {
if (value.isClosed()) {
// 短时间内还在, 等于简单重启了一下应用,不需要重新注册.
logger.debug('之前的连接已关闭,复用注册 ID 连接 ws', id);
this.unregister(id);
await new Promise(resolve => setTimeout(resolve, 100));
const wsMessage = this.register(id, { ws: opts.ws, user: opts.user, id, isLogin });
wsMessage.sendConnected();
return { wsMessage, isNew: false, id: id };
} else {
// 没有关闭 生成新的 id 连接.
id = id + '-mult-' + generateRegistryId(4);
realId = id;
logger.debug('之前的连接未关闭,使用新的 ID 连接 ws', id);
}
}
}
// 没有连接, 直接注册新的连接.
if (isNeedVerify) {
realId = id + registryId;
logger.debug('未登录用户,使用临时注册 ID 连接 ws', realId);
}
const wsMessage = this.register(realId, { ws: opts.ws, user: opts.user, id: realId, isLogin });
return { wsMessage, isNew: true, id: realId };
}
}
// 生成一个随机六位字符串作为注册 ID
const generateRegistryId = (len = 6) => {
return Math.random().toString(36).substring(2, 2 + len);
}