import { IncomingMessage, ServerResponse } from 'http'; import { App } from '@kevisual/router'; import { logger } from '../logger.ts'; // import { getLoginUser } from '@/modules/auth.ts'; import { SSEManager } from './sse/sse-manager.ts'; import { getLoginUser } from '../auth.ts'; import { emitter, flowme_insert } from '../../realtime/flowme/index.ts'; export const sseManager = new SSEManager(); emitter.on(flowme_insert, (data) => { console.log('flowme_insert event received:', data); const uid = data.uid; if (uid) { sseManager.broadcast({ type: 'flowme_insert', data }, { userId: uid }); } }); type ProxyOptions = { createNotFoundPage: (msg?: string) => any; }; export const UserV3Proxy = async (req: IncomingMessage, res: ServerResponse, opts?: ProxyOptions) => { const { url } = req; const _url = new URL(url || '', `http://localhost`); const { pathname, searchParams } = _url; let [user, app, ...rest] = pathname.split('/').slice(1); if (!user || !app) { opts?.createNotFoundPage?.('应用未找到'); return false; } const last = rest.slice(-1)[0] || ''; const method = req.method || 'GET'; console.log('UserV3Proxy request: last', last, rest); if (method === 'GET' && last === 'event') { const info = await getLoginUser(req); if (!info) { opts?.createNotFoundPage?.('没有登录'); return false; } console.log('建立 SSE 连接, user=', info.tokenUser.uid); addEventStream(req, res, info); return true; } res.end(`UserV3Proxy: user=${user}, app=${app}, rest=${rest.join('/')}`); console.log('UserV3Proxy:', { user, app, }); return true; }; type Opts = { tokenUser: any; token: string; } const addEventStream = (req: IncomingMessage, res: ServerResponse, opts?: Opts) => { res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Access-Control-Allow-Origin': '*' }); console.log('Client connected for SSE', opts?.tokenUser?.username || 'unknown'); const uid = opts?.tokenUser?.id || 'guest'; console.log('SSE for userId=', opts?.tokenUser); const connectionInfo = sseManager.createConnection({ userId: uid }); const { stream, id: connectionId } = connectionInfo; // 设置心跳 connectionInfo.heartbeatInterval = setInterval(() => { sseManager.sendToConnection(connectionId, { type: "heartbeat", timestamp: Date.now() }) .catch(() => { // 心跳失败时清理连接 sseManager.closeConnection(connectionId); }); }, 30000); // 30秒心跳 const timer = setInterval(async () => { sseManager.broadcast({ type: "time", timestamp: Date.now() }); const hasId = sseManager.getConnection(connectionId); if (!hasId) { clearInterval(timer); console.log('清理广播定时器,连接已关闭'); } }, 1000); res.pipe(stream as any); const bun = (req as any).bun const request = bun?.request as Bun.BunRequest if (request) { if (request.signal) { // 当客户端断开时清理连接 request.signal.addEventListener("abort", () => { console.log(`Client ${connectionId} disconnected`); sseManager.closeConnection(connectionId); }); } } // console.log('res', req) // res.end('123'); } const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));