Files
code-center/src/modules/v3/sse/sse-manager.ts

135 lines
3.6 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { nanoid } from "nanoid";
type ConnectionInfo = {
id: string;
writer: WritableStreamDefaultWriter;
stream: ReadableStream<any>;
connectedAt: Date;
heartbeatInterval: NodeJS.Timeout | null;
userId?: string;
};
export class SSEManager {
private connections: Map<string, ConnectionInfo> = new Map();
private userConnections: Map<string, Set<string>> = new Map(); // userId -> connectionIds
constructor() {
// 初始化逻辑
}
createConnection(info?: { userId?: string }): ConnectionInfo {
const connectionId = nanoid(16);
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
// 存储连接信息
const connectionInfo = {
id: connectionId,
writer,
stream: readable,
connectedAt: new Date(),
heartbeatInterval: null,
userId: info?.userId
};
this.connections.set(connectionId, connectionInfo);
// 添加到用户索引
if (info?.userId) {
const userSet = this.userConnections.get(info.userId) || new Set();
userSet.add(connectionId);
this.userConnections.set(info.userId, userSet);
}
return connectionInfo;
}
sendToConnection(connectionId: string, data: any) {
const connection = this.connections.get(connectionId);
if (connection) {
const message = `data: ${JSON.stringify(data)}\n\n`;
return connection.writer.write(new TextEncoder().encode(message));
}
throw new Error(`Connection ${connectionId} not found`);
}
getConnection(connectionId: string) {
return this.connections.get(connectionId);
}
broadcast(data: any, opts?: { userId?: string }) {
const message = `data: ${JSON.stringify(data)}\n\n`;
const promises = [];
// 指定 userId只发送给目标用户通过索引快速查找
if (opts?.userId) {
const userConnIds = this.userConnections.get(opts.userId);
if (userConnIds) {
for (const connId of userConnIds) {
const conn = this.connections.get(connId);
if (conn) {
promises.push(
conn.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(connId);
})
);
}
}
}
return Promise.all(promises);
}
// 未指定 userId广播给所有人
for (const [id, connection] of this.connections) {
promises.push(
connection.writer.write(new TextEncoder().encode(message))
.catch(() => {
this.closeConnection(id);
})
);
}
return Promise.all(promises);
}
closeConnection(connectionId: string) {
const connection = this.connections.get(connectionId);
if (connection) {
// 清理心跳定时器
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}
// 从用户索引中移除
if (connection.userId) {
const userSet = this.userConnections.get(connection.userId);
if (userSet) {
userSet.delete(connectionId);
if (userSet.size === 0) {
this.userConnections.delete(connection.userId);
}
}
}
// 关闭写入器
connection.writer.close().catch(console.error);
// 从管理器中移除
this.connections.delete(connectionId);
console.log(`Connection ${connectionId} closed`);
return true;
}
return false;
}
closeAllConnections() {
for (const [connectionId, connection] of this.connections) {
this.closeConnection(connectionId);
}
}
getActiveConnections() {
return Array.from(this.connections.keys());
}
}