Files
cli/assistant/src/module/remote-app/remote-app.ts

289 lines
8.5 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import type { App, ListenProcessParams } from '@kevisual/router';
import { EventEmitter } from 'eventemitter3';
type RemoteAppOptions = {
app?: App;
url?: string;
token?: string;
username?: string;
emitter?: EventEmitter;
id?: string;
/** 是否启用自动重连,默认 true */
autoReconnect?: boolean;
/** 最大重连次数,默认 Infinity */
maxReconnectAttempts?: number;
/** 初始重连延迟(毫秒),默认 1000 */
reconnectDelay?: number;
/** 重连延迟最大值(毫秒),默认 30000 */
maxReconnectDelay?: number;
/** 是否启用指数退避,默认 true */
enableBackoff?: boolean;
};
/**
* 远程共享地址类似https://kevisual.cn/ws/proxy
*/
export class RemoteApp {
mainApp: App;
url: string;
id: string;
username: string;
emitter: EventEmitter;
isConnected: boolean;
ws: WebSocket;
remoteIsConnected: boolean;
isError: boolean = false;
// 重连相关属性
autoReconnect: boolean;
maxReconnectAttempts: number;
reconnectDelay: number;
maxReconnectDelay: number;
enableBackoff: boolean;
reconnectAttempts: number = 0;
reconnectTimer: NodeJS.Timeout | null = null;
isManuallyClosed: boolean = false;
constructor(opts?: RemoteAppOptions) {
this.mainApp = opts?.app;
const token = opts.token;
const url = opts.url;
const id = opts.id;
const username = opts.username;
this.username = username;
this.emitter = opts?.emitter || new EventEmitter();
const _url = new URL(url);
if (token) {
_url.searchParams.set('token', token);
}
_url.searchParams.set('id', id);
if (!token && !username) {
console.error(`[remote-app] 不存在用户名和token ${id}. 权限认证会失败。`);
}
this.url = _url.toString();
this.id = id;
// 初始化重连相关配置
this.autoReconnect = opts?.autoReconnect ?? true;
this.maxReconnectAttempts = opts?.maxReconnectAttempts ?? Infinity;
this.reconnectDelay = opts?.reconnectDelay ?? 1000;
this.maxReconnectDelay = opts?.maxReconnectDelay ?? 30000;
this.enableBackoff = opts?.enableBackoff ?? true;
this.init();
}
async isConnect(): Promise<boolean> {
const that = this;
if (this.isConnected) {
return true;
}
// 如果正在进行重连,等待连接成功
if (this.reconnectTimer !== null) {
console.log(`远程应用 ${this.id} 正在重连中...`);
}
// 等待连接成功(支持初次连接和重连场景)
return new Promise((resolve) => {
const timeout = setTimeout(() => {
resolve(false);
that.emitter.off('open', listenOnce);
}, 5000);
const listenOnce = () => {
clearTimeout(timeout);
that.isConnected = true;
that.remoteIsConnected = true;
resolve(true);
};
that.emitter.once('open', listenOnce);
});
}
getWsURL(url: string) {
const { protocol } = new URL(url);
if (protocol.startsWith('ws')) {
return url.toString()
}
const wsProtocol = protocol === 'https:' ? 'wss:' : 'ws:';
const wsURL = url.toString().replace(protocol, wsProtocol);
return wsURL;
}
async init() {
if (!this.url) {
throw new Error('No url provided for remote app');
}
if (!this.id) {
throw new Error('No id provided for remote app');
}
this.isError = false;
// 关闭已有连接
if (this.ws) {
this.ws.close();
}
const ws = new WebSocket(this.getWsURL(this.url));
const that = this;
ws.onopen = function () {
that.isConnected = true;
that.onOpen();
console.log('[remote-app] WebSocket connection opened');
};
ws.onclose = function () {
that.isConnected = false;
that.onClose();
}
ws.onmessage = function (event) {
that.onMessage(event.data);
}
ws.onerror = function (error) {
that.onError(error);
}
this.ws = ws;
}
onOpen() {
this.isError = false;
this.reconnectAttempts = 0;
// 清除可能存在的重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.emitter.emit('open', this.id);
}
onClose() {
console.log('远程应用关闭:', this.id);
this.isConnected = false;
this.emitter.emit('close', this.id);
// 触发自动重连逻辑
if (this.autoReconnect && !this.isManuallyClosed) {
this.scheduleReconnect();
}
}
/** 计算下一次重连延迟 */
calculateReconnectDelay(): number {
if (!this.enableBackoff) {
return this.reconnectDelay;
}
// 指数退避算法delay = initialDelay * 2^(attempts - 1)
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
return Math.min(delay, this.maxReconnectDelay);
}
/** 安排重连 */
scheduleReconnect() {
// 检查是否达到最大重连次数
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error(`远程应用 ${this.id} 已达到最大重连次数 ${this.maxReconnectAttempts},停止重连`);
this.emitter.emit('maxReconnectAttemptsReached', this.id);
return;
}
// 清除可能存在的定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
const delay = this.calculateReconnectDelay();
this.reconnectAttempts++;
console.log(`远程应用 ${this.id} 将在 ${delay}ms 后尝试第 ${this.reconnectAttempts} 次重连`);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
try {
this.init();
} catch (error) {
console.error(`远程应用 ${this.id} 重连失败:`, error);
this.emitter.emit('reconnectFailed', { id: this.id, attempt: this.reconnectAttempts, error });
// 重连失败后继续尝试重连
this.scheduleReconnect();
}
}, delay);
}
/** 手动关闭连接,停止自动重连 */
disconnect() {
this.isManuallyClosed = true;
this.autoReconnect = false;
// 清除重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
// 关闭 WebSocket
if (this.ws) {
this.ws.close();
}
}
/** 手动重连 */
reconnect() {
this.isManuallyClosed = false;
this.reconnectAttempts = 0;
// 清除可能存在的定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.init();
}
onMessage(data: any) {
this.emitter.emit('message', data);
}
onError(error: any) {
console.error(`[remote-app] 远程应用错误: ${this.id}`, error);
this.isError = true;
this.emitter.emit('error', error);
}
on(event: 'open' | 'close' | 'message' | 'error' | 'maxReconnectAttemptsReached' | 'reconnectFailed', listener: (data: any) => void) {
this.emitter.on(event, listener);
return () => {
this.emitter.off(event, listener);
};
}
json(data: any) {
this.ws.send(JSON.stringify(data));
}
listenProxy() {
const remoteApp = this;
const app = this.mainApp;
const username = this.username;
const listenFn = async (event: any) => {
try {
const data = event.toString();
const body = JSON.parse(data)
const bodyData = body?.data as ListenProcessParams;
const message = bodyData?.message || {};
const context = bodyData?.context || {};
if (body?.code === 401) {
console.error('远程应用认证失败,请检查 token 是否正确');
this.isError = true;
return;
}
if (body?.type !== 'proxy') return;
if (!body.id) {
remoteApp.json({
id: body.id,
data: {
code: 400,
message: 'id is required',
},
});
return;
}
const res = await app.run(message, context);
remoteApp.json({
id: body.id,
data: {
code: res.code,
data: res.data,
message: res.message,
},
});
} catch (error) {
console.error('处理远程代理请求出错:', error);
}
};
remoteApp.json({
id: this.id,
type: 'registryClient',
username: username,
});
console.log(`远程应用 ${this.id} (${username}) 已注册到主应用,等待消息...`);
remoteApp.emitter.on('message', listenFn);
const closeMessage = () => {
remoteApp.emitter.off('message', listenFn);
}
remoteApp.emitter.once('close', () => {
closeMessage();
});
return () => {
closeMessage();
};
}
}