feat: enhance WebSocket proxy with user connection management and status reporting

- Updated StudioOpts type to include infoList for user connection status.
- Added rendering of connection info in createStudioAppListHtml.
- Modified self-restart logic to use a specific app path.
- Improved WebSocket connection handling in wsProxyManager, including user registration and ID management.
- Implemented connection status checks and responses in UserV1Proxy.
- Introduced renderServerHtml function to inject server data into HTML responses.
- Refactored page-proxy request handling for better URL management.
This commit is contained in:
2026-03-05 03:58:46 +08:00
parent aaedcb881b
commit bbdf9f087d
9 changed files with 451 additions and 270 deletions

View File

@@ -0,0 +1,6 @@
export const renderServerHtml = (data: any, html: string) => {
if (html.includes('<body>')) {
return html.replace('<body>', `<body><script>window.__SERVER_DATA__ = ${JSON.stringify(data)}</script>`);
}
return html;
}

View File

@@ -1,8 +1,18 @@
type StudioOpts = { user: string, userAppKey?: string; appIds: string[] }
type StudioOpts = {
user: string,
userAppKey?: string;
appIds: string[]
infoList?: {
user: string;
id: string;
status: 'waiting' | 'connected' | 'closed';
}[]
}
export const createStudioAppListHtml = (opts: StudioOpts) => {
const user = opts.user!;
const userAppKey = opts?.userAppKey;
let showUserAppKey = userAppKey;
const infos = opts.infoList || [];
if (showUserAppKey && showUserAppKey.startsWith(user + '--')) {
showUserAppKey = showUserAppKey.replace(user + '--', '');
}
@@ -333,7 +343,7 @@ export const createStudioAppListHtml = (opts: StudioOpts) => {
</div>
` : ''}
${appListContent}
<pre>${JSON.stringify(infos, null, 2)}</pre>
<div class="footer">
© ${new Date().getFullYear()} Studio - 应用管理
</div>

View File

@@ -1,7 +1,7 @@
import childProcess from 'child_process';
export const selfRestart = async () => {
const appName = 'code-center';
const appName = 'root/code-center';
// 检测 pm2 是否安装和是否有 appName 这个应用
try {
const res = childProcess.execSync(`pm2 list`);

View File

@@ -3,46 +3,40 @@ import { getLoginUserByToken } from '@/modules/auth.ts';
import { logger } from '../logger.ts';
export const wsProxyManager = new WsProxyManager();
import { WebSocketListenerFun } from '@kevisual/router/src/server/server-type.ts'
// 生成一个随机六位字符串作为注册 ID
const generateRegistryId = () => {
return Math.random().toString(36).substring(2, 8);
}
export const wssFun: WebSocketListenerFun = async (req, res) => {
// do nothing, just to enable ws upgrade event
const { id, ws, token, data, emitter } = req;
// console.log('req', req)
const { type } = data || {};
if (type === 'registryClient') {
const loginUser = await getLoginUserByToken(token);
if (!loginUser?.tokenUser) {
logger.debug('未登录,断开连接');
ws.send(JSON.stringify({ code: 401, message: '未登录' }));
setTimeout(() => {
ws.close(401, 'Unauthorized');
}, 1000);
let isLogin = false;
let user = '';
if (loginUser?.tokenUser) {
isLogin = true;
user = loginUser?.tokenUser?.username;
} else {
logger.debug('未登录,请求等待用户验证', data);
user = data?.username || '';
}
if (!user) {
logger.debug('未提供用户名,无法注册 ws 连接');
ws.close();
return;
}
const user = loginUser?.tokenUser?.username;
const userApp = user + '--' + id;
logger.debug('注册 ws 连接', userApp);
const wsMessage = wsProxyManager.get(userApp);
if (wsMessage) {
logger.debug('ws 连接已存在,关闭旧连接', userApp);
wsMessage.ws.close();
wsProxyManager.unregister(userApp);
await new Promise((resolve) => setTimeout(resolve, 200));
let userApp = user + '--' + id;
// TODO: 如果存在, 而且之前的那个关闭了,不需要验证,直接覆盖和复用.
let wsConnect = await wsProxyManager.createNewConnection({ ws, user, userApp, isLogin });
if (wsConnect.isNew) {
logger.debug('新连接注册成功', userApp);
}
// @ts-ignore
wsProxyManager.register(userApp, { user, ws });
ws.send(
JSON.stringify({
type: 'connected',
user: user,
id,
}),
);
emitter.once('close--' + id, () => {
logger.debug('ws emitter closed');
wsProxyManager.unregister(userApp);
});
// @ts-ignore
ws.data.userApp = userApp;
ws.data.userApp = wsConnect.id;
return;
}
// @ts-ignore

View File

@@ -1,7 +1,9 @@
import { nanoid } from 'nanoid';
import { customAlphabet } from 'nanoid';
import { WebSocket } from 'ws';
import { logger } from '../logger.ts';
import { EventEmitter } from 'eventemitter3';
const letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
const nanoid = customAlphabet(letters, 10);
class WsMessage {
ws: WebSocket;
@@ -9,11 +11,16 @@ class WsMessage {
emitter: EventEmitter;
private pingTimer?: NodeJS.Timeout;
private readonly PING_INTERVAL = 30000; // 30 秒发送一次 ping
constructor({ ws, user }: WssMessageOptions) {
id?: string;
status?: 'waiting' | 'connected' | 'closed';
manager: WsProxyManager;
constructor({ ws, user, id, isLogin, manager }: WssMessageOptions) {
this.ws = ws;
this.user = user;
this.id = id;
this.emitter = new EventEmitter();
this.manager = manager;
this.status = isLogin ? 'connected' : 'waiting';
this.startPing();
}
@@ -39,12 +46,44 @@ class WsMessage {
this.stopPing();
this.emitter.removeAllListeners();
}
isClosed() {
return this.ws.readyState === WebSocket.CLOSED;
}
async sendResponse(data: any) {
if (data.id) {
this.emitter.emit(data.id, data?.data);
}
}
async sendConnected() {
const id = this.id;
const user = this.user;
const data = { type: 'verified', user, id };
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
this.status = 'connected';
}
if (id.includes('-registry-')) {
const newId = id.split('-registry-')[0];
this.manager.changeId(id, newId);
const ws = this.ws;
// @ts-ignore
if (this.ws?.data) {
// @ts-ignore
this.ws.data.userApp = newId;
}
}
}
getInfo() {
const shortAppId = this.id ? this.id.split('--')[1] : '';
return {
user: this.user,
id: this.id,
status: this.status,
shortAppId,
pathname: this.id ? `/${this.user}/v1/${shortAppId}` : '',
};
}
async sendData(data: any, context?: any, opts?: { timeout?: number }) {
if (this.ws.readyState !== WebSocket.OPEN) {
return { code: 500, message: 'WebSocket is not open' };
@@ -64,6 +103,7 @@ class WsMessage {
const msg = { path: data?.path, key: data?.key, id: data?.id };
return new Promise((resolve) => {
const timer = setTimeout(() => {
console.log('ws-proxy sendData timeout', msg);
resolve({
code: 500,
message: `运行超时执行的id: ${id},参数是${JSON.stringify(msg)}`,
@@ -79,7 +119,12 @@ class WsMessage {
type WssMessageOptions = {
ws: WebSocket;
user?: string;
id?: string;
realId?: string;
isLogin?: boolean;
manager: WsProxyManager;
};
export class WsProxyManager {
wssMap: Map<string, WsMessage> = new Map();
PING_INTERVAL = 30000; // 30 秒检查一次连接状态
@@ -89,7 +134,7 @@ export class WsProxyManager {
}
this.checkConnceted();
}
register(id: string, opts?: { ws: WebSocket; user: string }) {
register(id: string, opts?: { ws: WebSocket; user: string, id?: string }) {
if (this.wssMap.has(id)) {
const value = this.wssMap.get(id);
if (value) {
@@ -100,8 +145,24 @@ export class WsProxyManager {
const [username, appId] = id.split('--');
const url = new URL(`/${username}/v1/${appId}`, 'https://kevisual.cn/');
console.log('WsProxyManager register', id, '访问地址', url.toString());
const value = new WsMessage({ ws: opts?.ws, user: opts?.user });
const value = new WsMessage({ ...opts, manager: this } as WssMessageOptions);
this.wssMap.set(id, value);
return value;
}
changeId(oldId: string, newId: string) {
const value = this.wssMap.get(oldId);
const originalValue = this.wssMap.get(newId);
if (originalValue) {
logger.debug(`WsProxyManager changeId: ${newId} already exists, close old connection`);
originalValue.ws.close();
originalValue.destroy();
}
if (value) {
this.wssMap.delete(oldId);
this.wssMap.set(newId, value);
value.id = newId;
logger.debug(`WsProxyManager changeId: ${oldId} -> ${newId}`);
}
}
unregister(id: string) {
const value = this.wssMap.get(id);
@@ -117,9 +178,33 @@ export class WsProxyManager {
}
return Array.from(this.wssMap.keys());
}
getIdsInfo(beginWith?: string) {
const ids = this.getIds(beginWith);
const infoList = this.getInfoList(ids);
return {
ids,
infoList,
};
}
getInfoList(ids: string[]) {
return ids.map(id => {
const value = this.wssMap.get(id);
if (value) {
return value.getInfo();
}
return null;
}).filter(Boolean);
}
get(id: string) {
return this.wssMap.get(id);
}
createId(id: string) {
if (!this.wssMap.has(id)) {
return id;
}
const newId = id + '-' + nanoid(6);
return newId;
}
checkConnceted() {
const that = this;
setTimeout(() => {
@@ -132,4 +217,40 @@ export class WsProxyManager {
that.checkConnceted();
}, this.PING_INTERVAL);
}
async createNewConnection(opts: { ws: any; user: string, userApp: string, isLogin?: boolean }) {
const id = opts.userApp;
let realId: string = id;
const isLogin = opts.isLogin || false;
const has = this.wssMap.has(id);
const registryId = '-registry-' + generateRegistryId(); // 生成一个随机六位字符串作为注册 ID
let isNeedVerify = !isLogin;
if (has) {
const value = this.wssMap.get(id);
if (value) {
if (value.isClosed()) {
// 短时间内还在, 等于简单重启了一下应用,不需要重新注册.
logger.debug('之前的连接已关闭,复用注册 ID 连接 ws', id);
this.unregister(id);
await new Promise(resolve => setTimeout(resolve, 100));
const wsMessage = this.register(id, { ws: opts.ws, user: opts.user, id });
wsMessage.sendConnected();
return { wsMessage, isNew: false, id: id };
} else {
// 没有关闭,需要重新注册鉴权一下, 生成新的 id 连接.
isNeedVerify = true;
}
}
}
// 没有连接, 直接注册新的连接.
if (isNeedVerify) {
realId = id + registryId;
logger.debug('未登录用户,使用临时注册 ID 连接 ws', realId);
}
const wsMessage = this.register(realId, { ws: opts.ws, user: opts.user, id: realId });
return { wsMessage, isNew: true, id: realId };
}
}
// 生成一个随机六位字符串作为注册 ID
const generateRegistryId = () => {
return Math.random().toString(36).substring(2, 8);
}

View File

@@ -7,14 +7,17 @@ import { getLoginUser } from '@/modules/auth.ts';
import { createStudioAppListHtml } from '../html/studio-app-list/index.ts';
import { omit } from 'es-toolkit';
import { baseProxyUrl, proxyDomain } from '../domain.ts';
import { renderServerHtml } from '../html/render-server-html.ts';
type ProxyOptions = {
createNotFoundPage: (msg?: string) => any;
};
export const UserV1Proxy = async (req: IncomingMessage, res: ServerResponse, opts?: ProxyOptions) => {
const { url } = req;
const { url, method } = req;
const _url = new URL(url || '', `http://localhost`);
const { pathname, searchParams } = _url;
const isGet = method === 'GET';
let [user, app, userAppKey] = pathname.split('/').slice(1);
if (!user || !app) {
opts?.createNotFoundPage?.('应用未找到');
@@ -32,14 +35,9 @@ export const UserV1Proxy = async (req: IncomingMessage, res: ServerResponse, opt
if (!userAppKey) {
if (isAdmin) {
// 获取所有的管理员的应用列表
const ids = wsProxyManager.getIds(user + '--');
const html = createStudioAppListHtml({ user, appIds: ids, userAppKey });
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
return;
return handleRequest(req, res, { user, app, userAppKey, isAdmin });
} else {
opts?.createNotFoundPage?.('没有访问应用权限');
opts?.createNotFoundPage?.('应用访问失败');
return false;
}
}
@@ -57,14 +55,19 @@ export const UserV1Proxy = async (req: IncomingMessage, res: ServerResponse, opt
}
logger.debug('data', data);
const client = wsProxyManager.get(userAppKey);
const ids = wsProxyManager.getIds(user + '--');
const { ids, infoList } = wsProxyManager.getIdsInfo(user + '--');
if (!client) {
if (isAdmin) {
const html = createStudioAppListHtml({ user, appIds: ids, userAppKey });
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
if (isGet) {
if (isAdmin) {
const html = createStudioAppListHtml({ user, appIds: ids, userAppKey, infoList });
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
} else {
opts?.createNotFoundPage?.('应用访问失败');
}
} else {
opts?.createNotFoundPage?.('应用访问失败');
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: '应用访问失败' }));
}
return false;
}
@@ -80,6 +83,11 @@ export const UserV1Proxy = async (req: IncomingMessage, res: ServerResponse, opt
if (!isAdmin) {
message = omit(data, ['token', 'cookies']);
}
if (client.status === 'waiting') {
res.writeHead(603, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ message: '应用没有鉴权' }));
return true;
}
const value = await client.sendData(message, {
state: { tokenUser: omit(loginUser.tokenUser, ['oauthExpand']) },
});
@@ -91,3 +99,38 @@ export const UserV1Proxy = async (req: IncomingMessage, res: ServerResponse, opt
opts?.createNotFoundPage?.('应用未启动');
return true;
};
const handleRequest = async (req: IncomingMessage, res: ServerResponse, opts?: { user?: string, app?: string, userAppKey?: string, isAdmin?: boolean }) => {
const { user, userAppKey } = opts || {};
const isGet = req.method === 'GET';
// 获取所有的管理员的应用列表
const { ids, infoList } = wsProxyManager.getIdsInfo(user + '--');
if (isGet) {
const html = createStudioAppListHtml({ user, appIds: ids, userAppKey, infoList });
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
return;
} else {
const url = new URL(req.url || '', 'http://localhost');
const path = url.searchParams.get('path');
if (path) {
const appId = url.searchParams.get('appId') || '';
const client = wsProxyManager.get(appId!)!;
if (!client) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ message: '应用未找到' }));
return;
}
if (path === 'connected') {
client.sendConnected();
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ code: 200, message: '应用已连接' }));
return;
}
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ code: 200, data: { ids, infoList } }));
return;
}
}

View File

@@ -184,7 +184,7 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR
/**
* url是pathname的路径
*/
const url = pathname;
const url = pathname || '';
if (!domainApp && noProxyUrl.includes(url)) {
if (url === '/') {
rediretHome(req, res);
@@ -312,13 +312,14 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR
const indexFile = isExist.indexFilePath; // 已经必定存在了
try {
let appFileUrl: string;
if (domainApp) {
appFileUrl = (url + '').replace(`/`, '');
} else {
appFileUrl = (url + '').replace(`/${user}/${app}/`, '');
}
appFileUrl = url.replace(`/${user}/${app}/`, '');
appFileUrl = decodeURIComponent(appFileUrl); // Decode URL components
let appFile = await userApp.getFile(appFileUrl);
if (!appFile && domainApp) {
const domainAppFileUrl = url.replace(`/`, '');
appFile = await userApp.getFile(domainAppFileUrl);
}
if (!appFile && url.endsWith('/')) {
appFile = await userApp.getFile(appFileUrl + 'index.html');
} else if (!appFile && !url.endsWith('/')) {