feat: add subscribe

This commit is contained in:
2024-09-23 20:51:40 +08:00
parent e828a4c084
commit 9a378275e1
4 changed files with 83 additions and 18 deletions

View File

@@ -1,21 +1,82 @@
import { App } from '@abearxiong/router';
import { useConfig } from '@abearxiong/use-config';
import { dynamicImport } from './lib/dynamic-import.ts';
import { redisPublisher } from './modules/redis.ts';
import { redisPublisher, redisSubscriber } from './modules/redis.ts';
useConfig();
export const emit = (channel: string, message: string) => {
redisPublisher.publish(channel, message);
export const emit = (channel: string, message?: any) => {
redisPublisher.publish(channel, JSON.stringify(message));
};
export const app = new App<{ import: any, emit: typeof emit }>({
export const app = new App<{ import: any; emit: typeof emit }>({
serverOptions: {
cors: {
origin: '*',
},
},
io: true,
routerContext: {
import: dynamicImport,
emit,
},
});
const clients = [];
// 订阅频道 pageEdit container 单个页面预览 container 整个页面预览
type ClientData = {
cid?: string; // container id
pid?: string[]; // page id
cids?: string[]; // container id
type: 'page' | 'container' | 'flow';
};
type MessageData = {
source: 'container';
data: any;
operation?: 'edit';
};
redisSubscriber.subscribe('pageEdit', () => {
console.log('Subscribed to Redis data-updates channel');
});
redisSubscriber.on('message', (channel, message) => {
if (channel !== 'pageEdit') return;
const m = JSON.parse(message) as MessageData;
clients.forEach((client) => {
const data = client.data as any;
const { cid, cids = [] } = data || {};
const wrapper = (data: any) => {
const res = {
type: 'pageEdit',
source: 'container',
data,
};
return JSON.stringify(res);
};
const { source, data: mData } = m; // 拆包
if (source === 'container') {
if (cid === mData?.id) {
client.ws.send(wrapper(mData));
} else if (cids.includes(mData?.id)) {
client.ws.send(wrapper(mData));
}
}
// 其他操作 暂时 不处理
// TODO
});
});
app.io.addListener('subscribe', async ({ data, end, ws }) => {
const { type } = data || {};
if (type === 'pageEdit') {
clients.push({ ws, data: data.data }); // 拆包里面包含的type信息去掉
end({ code: 200, data: 'subscribe success' });
} else if (type === 'unsubscribe') {
const index = clients.findIndex((client) => client.ws === ws);
clients.splice(index, 1);
end({ code: 200, data: 'unsubscribe success' });
} else {
end({ code: 404, data: 'subscribe fail' });
}
ws.on('close', () => {
const index = clients.findIndex((client) => client.ws === ws);
clients.splice(index, 1);
});
});