feat: Implement LiveCode module with WebSocket and SSE support

- Added config management using `useConfig` for environment variables.
- Created `LiveCode` class to manage WebSocket connections and routing.
- Implemented `SSEManager` for Server-Sent Events handling.
- Developed `WSSManager` for managing WebSocket connections with heartbeat functionality.
- Introduced `ReconnectingWebSocket` class for robust WebSocket client with automatic reconnection.
- Added test files for live application demonstrating WebSocket and TCP server integration.
This commit is contained in:
2026-02-02 23:29:58 +08:00
parent 5774391bbe
commit a76c2235ea
19 changed files with 871 additions and 385 deletions

View File

@@ -107,11 +107,13 @@ export type AssistantConfigData = {
* 例子: { proxy: [ { type: 'router', api: 'https://localhost:50002/api/router' } ] }
* base: 是否使用 /api/router的基础路径默认false
* lightcode: 是否启用lightcode路由默认false
* livecode: 是否启用livecode路由实时的注册和销毁默认false
*/
router?: {
proxy: ProxyInfo[];
base?: boolean;
lightcode?: boolean;
livecode?: boolean;
}
routes?: AssistantRoutes[],
/**

View File

@@ -36,7 +36,23 @@ export class ModuleResolver {
// 相对路径 ./xxx 或 ../xxx
const localFullPath = path.resolve(this.root, routePath);
return this.fileIsExists(localFullPath) ? localFullPath : routePath;
if (!this.fileIsExists(localFullPath)) {
return routePath;
}
// 如果是目录,解析入口文件
if (fs.statSync(localFullPath).isDirectory()) {
const pkgJsonPath = path.join(localFullPath, 'package.json');
const pkg = this.readPackageJson(pkgJsonPath);
if (pkg) {
const entryPath = this.resolvePackageExport(pkg, '');
return path.join(localFullPath, entryPath);
}
// 没有 package.json默认使用 index.ts
return path.join(localFullPath, 'index.ts');
}
return localFullPath;
}
/** 解析 scoped 包 */

View File

@@ -222,7 +222,7 @@ export class AssistantApp extends Manager {
const routeStr = typeof route === 'string' ? route : route.path;
const resolvedPath = this.resolver.resolve(routeStr);
await import(resolvedPath);
console.log('路由已初始化', route);
console.log('[routes] 路由已初始化', route, resolvedPath);
} catch (err) {
console.error('初始化路由失败', route, err);
}

View File

@@ -0,0 +1,10 @@
import { useConfig } from '@kevisual/use-config';
import { HomeConfigDir } from './assistant/config/index.ts';
import path from 'node:path';
export const config = useConfig({
dotenvOpts: {
path: [path.join(HomeConfigDir, '.env'), '.env'],
}
})
// console.log('配置文件目录:', config, HomeConfigDir);

View File

@@ -165,7 +165,11 @@ export const initLightCode = async (opts: opts) => {
} else {
ctx.throw(runRes2.error || 'Lightcode 路由执行失败');
}
}).addTo(app);
}).addTo(app, {
override: false,
// @ts-ignore
overwrite: false
});// 不允许覆盖已存在的路由
}
}

View File

@@ -0,0 +1,131 @@
import { WSSManager } from './wss.ts';
import { App, Route } from '@kevisual/router'
import { WebSocketReq } from '@kevisual/router'
import { EventEmitter } from 'eventemitter3';
import { customAlphabet } from 'nanoid';
const letter = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
const customId = customAlphabet(letter, 16);
export class LiveCode {
wssManager: WSSManager;
app: App;
emitter: EventEmitter;
constructor(app: App) {
this.wssManager = new WSSManager({ heartbeatInterval: 5000 });
this.app = app;
this.emitter = new EventEmitter();
console.log('[LiveCode] 模块已初始化');
}
async conn(req: WebSocketReq) {
const { ws, emitter, id } = req;
const that = this;
// @ts-ignore
let wid = ws.data?.wid;
if (!wid) {
const _id = this.wssManager.addConnection(req, { userId: id });
// @ts-ignore
ws.data.wid = _id;
emitter.once('close--' + id, () => {
that.wssManager.closeConnection(_id);
this.deinitAppRoutes(_id);
});
console.log('[LiveCode]新的 WebSocket 连接已打开', _id);
const res = await that.init(_id);
if (res.code === 200) {
console.log('[LiveCode]初始化路由列表完成');
that.initAppRoutes(res.data?.list || [], _id);
} else {
console.error('[LiveCode]初始化路由列表失败:', res?.message);
}
return this;
}
that.onMessage(req);
return this;
}
getWss(id: string) {
return this.wssManager.getConnection(id)
}
async init(id: string): Promise<{ code: number, message?: string, data?: any }> {
return this.sendData({ path: 'router', key: 'list', }, id);
}
sendData(data: any, id: string): Promise<{ code: number, message?: string, data?: any }> {
const reqId = customId()
const wss = this.getWss(id);
if (!wss) {
return Promise.resolve({ code: 500, message: '连接不存在或已关闭' });
}
const emitter = this.emitter;
const wsReq = wss.wsReq;
try {
wsReq.ws.send(JSON.stringify({
type: 'router',
id: reqId,
data: data
}));
} catch (error) {
console.error('[LiveCode]发送数据失败:', error);
return Promise.resolve({ code: 500, message: '发送数据失败' });
}
return new Promise((resolve) => {
const timeout = setTimeout(() => {
resolve({ code: 500, message: '请求超时' });
emitter.off(reqId, listenOnce);
}, 5000);
const listenOnce = (resData: any) => {
clearTimeout(timeout);
resolve(resData);
emitter.off(reqId, listenOnce);
}
emitter.once(reqId, listenOnce);
});
}
onMessage(req: WebSocketReq) {
const { data } = req;
if (data?.id) {
// console.log('LiveCode 收到消息:', data);
this.emitter.emit(data.id, data.data);
} else {
console.warn('[LiveCode] 未知的消息格式', data);
}
}
initAppRoutes(list: Route[], wid: string) {
for (const route of list) {
const path = route.path || '';
const id = route.id || '';
if (path.startsWith('router') || path.startsWith('auth') || path.startsWith('admin-autu') || path.startsWith('call')) {
continue;
}
// console.log('注册路由:', route.path, route.description, route.metadata, route.id);
this.app.route({
path: route.id,
key: route.key,
description: route.description,
metadata: {
...route.metadata,
liveCodeId: wid
},
middleware: ['auth'],
}).define(async (ctx) => {
const { token, cookie, ...rest } = ctx.query;
const tokenUser = ctx.state.tokernUser;
const res = await this.sendData({
id: route.id,
tokenUser,
payload: rest,
}, wid);
// console.log('路由响应数据:', res);
ctx.forward(res)
}).addTo(this.app, {
// override: false,
// // @ts-ignore
// overwrite: false
});
}
}
deinitAppRoutes(wid: string) {
const routesToRemove = this.app.routes.filter(route => route.metadata?.liveCodeId === wid);
for (const route of routesToRemove) {
this.app.removeById(route.id);
}
}
}

View File

@@ -0,0 +1,134 @@
import { nanoid } from "nanoid";
type ConnectionInfo = {
id: string;
writer: WritableStreamDefaultWriter;
stream: ReadableStream<any>;
connectedAt: Date;
heartbeatInterval: NodeJS.Timeout | null;
userId?: string;
};
export class SSEManager {
private connections: Map<string, ConnectionInfo> = new Map();
private userConnections: Map<string, Set<string>> = new Map(); // userId -> connectionIds
constructor() {
// 初始化逻辑
}
createConnection(info?: { userId?: string }): ConnectionInfo {
const connectionId = nanoid(16);
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
// 存储连接信息
const connectionInfo = {
id: connectionId,
writer,
stream: readable,
connectedAt: new Date(),
heartbeatInterval: null,
userId: info?.userId
};
this.connections.set(connectionId, connectionInfo);
// 添加到用户索引
if (info?.userId) {
const userSet = this.userConnections.get(info.userId) || new Set();
userSet.add(connectionId);
this.userConnections.set(info.userId, userSet);
}
return connectionInfo;
}
sendToConnection(connectionId: string, data: any) {
const connection = this.connections.get(connectionId);
if (connection) {
const message = `data: ${JSON.stringify(data)}\n\n`;
return connection.writer.write(new TextEncoder().encode(message));
}
throw new Error(`Connection ${connectionId} not found`);
}
getConnection(connectionId: string) {
return this.connections.get(connectionId);
}
broadcast(data: any, opts?: { userId?: string }) {
const message = `data: ${JSON.stringify(data)}\n\n`;
const promises = [];
// 指定 userId只发送给目标用户通过索引快速查找
if (opts?.userId) {
const userConnIds = this.userConnections.get(opts.userId);
if (userConnIds) {
for (const connId of userConnIds) {
const conn = this.connections.get(connId);
if (conn) {
promises.push(
conn.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(connId);
})
);
}
}
}
return Promise.all(promises);
}
// 未指定 userId广播给所有人
for (const [id, connection] of this.connections) {
promises.push(
connection.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(id);
})
);
}
return Promise.all(promises);
}
closeConnection(connectionId: string) {
const connection = this.connections.get(connectionId);
if (connection) {
// 清理心跳定时器
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}
// 从用户索引中移除
if (connection.userId) {
const userSet = this.userConnections.get(connection.userId);
if (userSet) {
userSet.delete(connectionId);
if (userSet.size === 0) {
this.userConnections.delete(connection.userId);
}
}
}
// 关闭写入器
connection.writer.close().catch(console.error);
// 从管理器中移除
this.connections.delete(connectionId);
console.log(`Connection ${connectionId} closed`);
return true;
}
return false;
}
closeAllConnections() {
for (const [connectionId, connection] of this.connections) {
this.closeConnection(connectionId);
}
}
getActiveConnections() {
return Array.from(this.connections.keys());
}
}

View File

@@ -0,0 +1,213 @@
import { nanoid } from "nanoid";
import { WebSocketReq } from '@kevisual/router'
type ConnectionInfo = {
id: string;
wsReq: WebSocketReq;
connectedAt: Date;
heartbeatInterval: NodeJS.Timeout | null;
userId?: string;
lastHeartbeat: Date;
};
export class WSSManager {
private connections: Map<string, ConnectionInfo> = new Map();
private userConnections: Map<string, Set<string>> = new Map();
private heartbeatInterval: number = 30000; // 默认30秒
constructor(opts?: { heartbeatInterval?: number }) {
if (opts?.heartbeatInterval) {
this.heartbeatInterval = opts.heartbeatInterval;
}
}
/**
* 添加 WebSocket 连接
*/
addConnection(wsReq: WebSocketReq, info?: { userId?: string }): string {
const connectionId = nanoid(16);
const now = new Date();
const connectionInfo: ConnectionInfo = {
id: connectionId,
wsReq: wsReq,
connectedAt: now,
heartbeatInterval: null,
userId: info?.userId,
lastHeartbeat: now,
};
// 启动心跳
this.startHeartbeat(connectionInfo);
// 存储连接
this.connections.set(connectionId, connectionInfo);
// 添加到用户索引
if (info?.userId) {
const userSet = this.userConnections.get(info.userId) || new Set();
userSet.add(connectionId);
this.userConnections.set(info.userId, userSet);
}
return connectionId;
}
/**
* 启动心跳
*/
private startHeartbeat(connection: ConnectionInfo) {
connection.heartbeatInterval = setInterval(() => {
const ws = connection.wsReq.ws;
ws.send(JSON.stringify({ type: 'heartbeat', timestamp: new Date().toISOString() }));
connection.lastHeartbeat = new Date();
console.log(`[LiveCode] 发送心跳给连接 ${connection.id}`);
}, this.heartbeatInterval);
}
/**
* 发送消息到指定连接
*/
sendToConnection(connectionId: string, data: any): boolean {
const connection = this.connections.get(connectionId);
if (connection) {
// 发送消息
connection.wsReq.ws.send(JSON.stringify(data));
return true;
}
return false;
}
/**
* 发送消息到指定用户的所有连接
*/
sendToUser(userId: string, data: any): number {
const userConnIds = this.userConnections.get(userId);
if (!userConnIds) return 0;
let sentCount = 0;
for (const connId of userConnIds) {
if (this.sendToConnection(connId, data)) {
sentCount++;
}
}
return sentCount;
}
/**
* 广播消息到所有连接
*/
broadcast(data: any, opts?: { userId?: string; excludeConnectionId?: string }): number {
if (opts?.userId) {
// 发送给指定用户
return this.sendToUser(opts.userId, data);
}
let sentCount = 0;
for (const [connId, connection] of this.connections) {
// 跳过排除的连接
if (opts?.excludeConnectionId && connId === opts.excludeConnectionId) {
continue;
}
if (this.sendToConnection(connId, data)) {
sentCount++;
}
}
return sentCount;
}
/**
* 获取连接信息
*/
getConnection(connectionId: string): ConnectionInfo | undefined {
return this.connections.get(connectionId);
}
/**
* 获取用户的所有连接
*/
getUserConnections(userId: string): ConnectionInfo[] {
const userConnIds = this.userConnections.get(userId);
if (!userConnIds) return [];
return Array.from(userConnIds)
.map((id) => this.connections.get(id))
.filter((conn): conn is ConnectionInfo => conn !== undefined);
}
/**
* 检查连接是否活跃(基于心跳)
*/
isConnectionAlive(connectionId: string, timeout: number = 60000): boolean {
const connection = this.connections.get(connectionId);
if (!connection) return false;
const now = new Date();
const timeSinceLastHeartbeat = now.getTime() - connection.lastHeartbeat.getTime();
return timeSinceLastHeartbeat < timeout;
}
/**
* 关闭指定连接
*/
closeConnection(connectionId: string): boolean {
const connection = this.connections.get(connectionId);
if (connection) {
// 清理心跳定时器
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}
// 从用户索引中移除
if (connection.userId) {
const userSet = this.userConnections.get(connection.userId);
if (userSet) {
userSet.delete(connectionId);
if (userSet.size === 0) {
this.userConnections.delete(connection.userId);
}
}
}
try {
connection.wsReq.ws.close();
} catch (error) {
console.error(`Error closing WebSocket for connection ${connectionId}:`, error);
}
// 从管理器中移除
this.connections.delete(connectionId);
console.log(`WebSocket connection ${connectionId} closed`);
return true;
}
return false;
}
/**
* 关闭所有连接
*/
closeAllConnections(): void {
for (const [connectionId] of this.connections) {
this.closeConnection(connectionId);
}
}
/**
* 获取活跃连接列表
*/
getActiveConnections(): string[] {
return Array.from(this.connections.keys());
}
/**
* 获取连接数量
*/
getConnectionCount(): number {
return this.connections.size;
}
/**
* 获取用户连接数量
*/
getUserConnectionCount(userId: string): number {
return this.userConnections.get(userId)?.size || 0;
}
}

View File

@@ -32,7 +32,7 @@ export const proxyRoute = async (req: http.IncomingMessage, res: http.ServerResp
return fileProxy(req, res, {
path: localProxyProxy.path,
rootPath: localProxy.pagesDir,
indexPath: localProxyProxy.indexPath,
indexPath: localProxyProxy.file?.indexPath,
});
}
res.statusCode = 404;