更新依赖项,添加 flowme 插入触发器和监听器;重构数据库连接管理;优化用户路由和 SSE 处理

This commit is contained in:
2026-02-01 03:58:40 +08:00
parent 7c61bd3ac5
commit 82c9b834e9
16 changed files with 412 additions and 219 deletions

97
src/modules/v3/index.ts Normal file
View File

@@ -0,0 +1,97 @@
import { IncomingMessage, ServerResponse } from 'http';
import { App } from '@kevisual/router';
import { logger } from '../logger.ts';
// import { getLoginUser } from '@/modules/auth.ts';
import { SSEManager } from './sse/sse-manager.ts';
import { getLoginUser } from '../auth.ts';
import { emitter, flowme_insert } from '../../realtime/flowme/index.ts';
export const sseManager = new SSEManager();
emitter.on(flowme_insert, (data) => {
console.log('flowme_insert event received:', data);
const uid = data.uid;
if (uid) {
sseManager.broadcast({ type: 'flowme_insert', data }, { userId: uid });
}
});
type ProxyOptions = {
createNotFoundPage: (msg?: string) => any;
};
export const UserV3Proxy = async (req: IncomingMessage, res: ServerResponse, opts?: ProxyOptions) => {
const { url } = req;
const _url = new URL(url || '', `http://localhost`);
const { pathname, searchParams } = _url;
let [user, app, ...rest] = pathname.split('/').slice(1);
if (!user || !app) {
opts?.createNotFoundPage?.('应用未找到');
return false;
}
const last = rest.slice(-1)[0] || '';
const method = req.method || 'GET';
console.log('UserV3Proxy request: last', last, rest);
if (method === 'GET' && last === 'event') {
const info = await getLoginUser(req);
if (!info) {
opts?.createNotFoundPage?.('没有登录');
return false;
}
console.log('建立 SSE 连接, user=', info.tokenUser.uid);
addEventStream(req, res, info);
return true;
}
res.end(`UserV3Proxy: user=${user}, app=${app}, rest=${rest.join('/')}`);
console.log('UserV3Proxy:', { user, app, });
return true;
};
type Opts = {
tokenUser: any;
token: string;
}
const addEventStream = (req: IncomingMessage, res: ServerResponse, opts?: Opts) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*'
});
console.log('Client connected for SSE', opts?.tokenUser?.username || 'unknown');
const uid = opts?.tokenUser?.id || 'guest';
console.log('SSE for userId=', opts?.tokenUser);
const connectionInfo = sseManager.createConnection({ userId: uid });
const { stream, id: connectionId } = connectionInfo;
// 设置心跳
connectionInfo.heartbeatInterval = setInterval(() => {
sseManager.sendToConnection(connectionId, { type: "heartbeat", timestamp: Date.now() })
.catch(() => {
// 心跳失败时清理连接
sseManager.closeConnection(connectionId);
});
}, 30000); // 30秒心跳
const timer = setInterval(async () => {
sseManager.broadcast({ type: "time", timestamp: Date.now() });
const hasId = sseManager.getConnection(connectionId);
if (!hasId) {
clearInterval(timer);
console.log('清理广播定时器,连接已关闭');
}
}, 1000);
res.pipe(stream as any);
const bun = (req as any).bun
const request = bun?.request as Bun.BunRequest<string>
if (request) {
if (request.signal) {
// 当客户端断开时清理连接
request.signal.addEventListener("abort", () => {
console.log(`Client ${connectionId} disconnected`);
sseManager.closeConnection(connectionId);
});
}
}
// console.log('res', req)
// res.end('123');
}
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

View File

@@ -0,0 +1,134 @@
import { nanoid } from "nanoid";
type ConnectionInfo = {
id: string;
writer: WritableStreamDefaultWriter;
stream: ReadableStream<any>;
connectedAt: Date;
heartbeatInterval: NodeJS.Timeout | null;
userId?: string;
};
export class SSEManager {
private connections: Map<string, ConnectionInfo> = new Map();
private userConnections: Map<string, Set<string>> = new Map(); // userId -> connectionIds
constructor() {
// 初始化逻辑
}
createConnection(info?: { userId?: string }): ConnectionInfo {
const connectionId = nanoid(16);
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
// 存储连接信息
const connectionInfo = {
id: connectionId,
writer,
stream: readable,
connectedAt: new Date(),
heartbeatInterval: null,
userId: info?.userId
};
this.connections.set(connectionId, connectionInfo);
// 添加到用户索引
if (info?.userId) {
const userSet = this.userConnections.get(info.userId) || new Set();
userSet.add(connectionId);
this.userConnections.set(info.userId, userSet);
}
return connectionInfo;
}
sendToConnection(connectionId: string, data: any) {
const connection = this.connections.get(connectionId);
if (connection) {
const message = `data: ${JSON.stringify(data)}\n\n`;
return connection.writer.write(new TextEncoder().encode(message));
}
throw new Error(`Connection ${connectionId} not found`);
}
getConnection(connectionId: string) {
return this.connections.get(connectionId);
}
broadcast(data: any, opts?: { userId?: string }) {
const message = `data: ${JSON.stringify(data)}\n\n`;
const promises = [];
// 指定 userId只发送给目标用户通过索引快速查找
if (opts?.userId) {
const userConnIds = this.userConnections.get(opts.userId);
if (userConnIds) {
for (const connId of userConnIds) {
const conn = this.connections.get(connId);
if (conn) {
promises.push(
conn.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(connId);
})
);
}
}
}
return Promise.all(promises);
}
// 未指定 userId广播给所有人
for (const [id, connection] of this.connections) {
promises.push(
connection.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(id);
})
);
}
return Promise.all(promises);
}
closeConnection(connectionId: string) {
const connection = this.connections.get(connectionId);
if (connection) {
// 清理心跳定时器
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}
// 从用户索引中移除
if (connection.userId) {
const userSet = this.userConnections.get(connection.userId);
if (userSet) {
userSet.delete(connectionId);
if (userSet.size === 0) {
this.userConnections.delete(connection.userId);
}
}
}
// 关闭写入器
connection.writer.close().catch(console.error);
// 从管理器中移除
this.connections.delete(connectionId);
console.log(`Connection ${connectionId} closed`);
return true;
}
return false;
}
closeAllConnections() {
for (const [connectionId, connection] of this.connections) {
this.closeConnection(connectionId);
}
}
getActiveConnections() {
return Array.from(this.connections.keys());
}
}