import path from 'path'; import { redis, subscriber } from './redis/redis.ts'; import { config, fileStore } from '../module/config.ts'; import fs from 'fs'; import crypto from 'crypto'; import { nanoid } from 'nanoid'; import { pipeline } from 'stream'; import { promisify } from 'util'; import { fetchApp, fetchDomain, fetchTest } from './query/get-router.ts'; import { getAppLoadStatus, setAppLoadStatus } from './redis/get-app-status.ts'; import { minioResources } from './minio.ts'; import { downloadFileFromMinio } from './proxy/http-proxy.ts'; import { logger } from './logger.ts'; const pipelineAsync = promisify(pipeline); const { resources } = config?.proxy || { resources: 'https://minio.xiongxiao.me/resources' }; const wrapperResources = (resources: string, urlpath: string) => { if (urlpath.startsWith('http')) { return urlpath; } return `${resources}/${urlpath}`; }; const demoData = { user: 'root', key: 'codeflow', appType: 'web-single', // version: '1.0.0', domain: null, type: 'oss', // oss, 默认是oss data: { files: [ { name: 'index.html', path: 'codeflow/index.html', }, { name: 'assets/index-14y4J8dP.js', path: 'codeflow/assets/index-14y4J8dP.js', }, { name: 'assets/index-C-libw4a.css', path: 'codeflow/assets/index-C-libw4a.css', }, ], }, }; type UserAppOptions = { user: string; app: string; }; export class UserApp { user: string; app: string; isTest: boolean; constructor(options: UserAppOptions) { this.user = options.user; this.app = options.app; if (this.user === 'test') { this.isTest = true; } } /** * 是否已经加载到本地了 * @returns */ async getExist() { const app = this.app; const user = this.user; const key = 'user:app:exist:' + app + ':' + user; const permissionKey = 'user:app:permission:' + app + ':' + user; const value = await redis.get(key); const permission = await redis.get(permissionKey); if (!value) { return false; } const [indexFilePath, etag, proxy] = value.split('||'); try { return { indexFilePath, etag, proxy: proxy === 'true', permission: permission ? JSON.parse(permission) : { share: 'private' }, }; } catch (e) { console.error('getExist error parse', e); await this.clearCacheData(); return false; } } /** * 获取缓存数据,不存在不会加载 * @returns */ async getCache() { const app = this.app; const user = this.user; const key = 'user:app:' + app + ':' + user; const value = await redis.get(key); if (!value) { return null; } return JSON.parse(value); } async getFile(appFileUrl: string) { const app = this.app; const user = this.user; const key = 'user:app:set:' + app + ':' + user; const value = await redis.hget(key, appFileUrl); // const values = await redis.hgetall(key); // console.log('getFile', values); return value; } static async getDomainApp(domain: string) { const key = 'domain:' + domain; const value = await redis.get(key); if (value) { const [_user, _app] = value.split(':'); return { user: _user, app: _app, }; } // 获取域名对应的用户和应用 const fetchRes = await fetchDomain(domain).catch((err) => { return { code: 500, message: err, }; }); if (fetchRes?.code !== 200) { console.log('fetchRes is error', fetchRes); return null; } const fetchData = fetchRes.data; if (fetchData.status !== 'running') { console.error('fetchData status is not running', fetchData.user, fetchData.key); return null; } const data = { user: fetchData.user, app: fetchData.key, }; redis.set(key, data.user + ':' + data.app, 'EX', 60 * 60 * 24 * 7); // 7天 const userDomainApp = 'user:domain:app:' + data.user + ':' + data.app; const domainKeys = await redis.get(userDomainApp); let domainKeysList = domainKeys ? JSON.parse(domainKeys) : []; domainKeysList.push(domain); const uniq = (arr: string[]) => { return [...new Set(arr)]; }; domainKeysList = uniq(domainKeysList); await redis.set(userDomainApp, JSON.stringify(domainKeysList), 'EX', 60 * 60 * 24 * 7); // 7天 return data; } /** * 加载结束 * @param msg */ async setLoaded(status: 'running' | 'error' | 'loading', msg?: string) { const app = this.app; const user = this.user; await setAppLoadStatus(user, app, { status, message: msg, }); } /** * 获取加载状态 * @returns */ async getLoaded() { const app = this.app; const user = this.user; const value = await getAppLoadStatus(user, app); return value; } /** * 设置缓存数据,当出问题了,就重新加载。 * @returns */ async setCacheData() { const app = this.app; const user = this.user; const isTest = this.isTest; const key = 'user:app:' + app + ':' + user; const fetchRes = isTest ? await fetchTest(app) : await fetchApp({ user, app }); if (fetchRes?.code !== 200) { console.log('fetchRes is error', fetchRes, 'user', user, 'app', app); return { code: 500, message: 'fetchRes is error' }; } const loadStatus = await getAppLoadStatus(user, app); logger.debug('loadStatus', loadStatus); if (loadStatus.status === 'loading') { // 其他情况,error或者running都可以重新加载 return { loading: true, }; } const fetchData = fetchRes.data; if (!fetchData.type) { // console.error('fetchData type is error', fetchData); // return false; fetchData.type = 'oss'; } if (fetchData.status !== 'running') { console.error('fetchData status is not running', fetchData.user, fetchData.key); return { code: 500, message: 'app status is not running' }; } // console.log('fetchData', JSON.stringify(fetchData.data.files, null, 2)); // const getFileSize this.setLoaded('loading', 'loading'); const loadProxy = async () => { const value = fetchData; await redis.set(key, JSON.stringify(value)); const version = value.version; let indexHtml = resources + '/' + user + '/' + app + '/' + version + '/index.html'; const files = value?.data?.files || []; const permission = value?.data?.permission || { share: 'private' }; const data = {}; // 将文件名和路径添加到 `data` 对象中 files.forEach((file) => { if (file.name === 'index.html') { indexHtml = wrapperResources(resources, file.path); } data[file.name] = wrapperResources(resources, file.path); }); await redis.set('user:app:exist:' + app + ':' + user, indexHtml + '||etag||true', 'EX', 60 * 60 * 24 * 7); // 7天 await redis.set('user:app:permission:' + app + ':' + user, JSON.stringify(permission), 'EX', 60 * 60 * 24 * 7); // 7天 await redis.hset('user:app:set:' + app + ':' + user, data); this.setLoaded('running', 'loaded'); }; const loadFilesFn = async () => { const value = await downloadUserAppFiles(user, app, fetchData); if (value.data.files.length === 0) { console.error('root files length is zero', user, app); this.setLoaded('running', 'root files length is zero'); const mockPath = path.join(fileStore, user, app, 'index.html'); value.data.files = [ { name: 'index.html', // 映射 path: mockPath.replace(fileStore, ''), // 实际 }, ]; if (!checkFileExistsSync(path.join(fileStore, user, app))) { fs.mkdirSync(path.join(fileStore, user, app), { recursive: true }); } // 自己创建一个index.html fs.writeFileSync(path.join(fileStore, user, app, 'index.html'), 'not has any app info', { encoding: 'utf-8', }); } await redis.set(key, JSON.stringify(value)); const files = value.data.files; const permission = fetchData?.data?.permission || { share: 'private' }; const data = {}; let indexHtml = path.join(fileStore, user, app, 'index.html') + '||etag||false'; // 将文件名和路径添加到 `data` 对象中 files.forEach((file) => { data[file.name] = file.path; if (file.name === 'index.html') { indexHtml = file.path; } }); await redis.set('user:app:exist:' + app + ':' + user, indexHtml, 'EX', 60 * 60 * 24 * 7); // 7天 await redis.set('user:app:permission:' + app + ':' + user, JSON.stringify(permission), 'EX', 60 * 60 * 24 * 7); // 7天 await redis.hset('user:app:set:' + app + ':' + user, data); this.setLoaded('running', 'loaded'); }; logger.debug('loadFilesFn', fetchData.proxy); try { if (fetchData.proxy === true) { await loadProxy(); return { code: 200, data: 'loaded', }; } else { loadFilesFn(); } } catch (e) { console.error('loadFilesFn error', e); this.setLoaded('error', 'loadFilesFn error'); } return { code: 20000, data: 'loading', }; } async clearCacheData() { const app = this.app; const user = this.user; const key = 'user:app:' + app + ':' + user; await redis.del(key); await redis.del('user:app:exist:' + app + ':' + user); await redis.del('user:app:set:' + app + ':' + user); await redis.del('user:app:status:' + app + ':' + user); await redis.del('user:app:permission:' + app + ':' + user); const userDomainApp = 'user:domain:app:' + user + ':' + app; const domainKeys = await redis.get(userDomainApp); if (domainKeys) { const domainKeysList = JSON.parse(domainKeys); domainKeysList.forEach(async (domain: string) => { await redis.del('domain:' + domain); }); } await redis.del(userDomainApp); // 删除所有文件 deleteUserAppFiles(user, app); } fileCheck(file: string) { return checkFileExistsSync(file); } async close() { // 关闭连接 await redis.quit(); } } export const downloadUserAppFiles = async (user: string, app: string, data: typeof demoData) => { const { data: { files, ...dataRest }, ...rest } = data; const uploadFiles = path.join(fileStore, user, app); if (!checkFileExistsSync(uploadFiles)) { fs.mkdirSync(uploadFiles, { recursive: true }); } const newFiles = []; if (data.type === 'oss') { let serverPath = new URL(resources).href; let hasIndexHtml = false; // server download file for (let i = 0; i < files.length; i++) { const file = files[i]; const destFile = path.join(uploadFiles, file.name); const destDir = path.dirname(destFile); // 获取目标文件所在的目录路径 if (file.name === 'index.html') { hasIndexHtml = true; } // 检查目录是否存在,如果不存在则创建 if (!checkFileExistsSync(destDir)) { fs.mkdirSync(destDir, { recursive: true }); // 递归创建目录 } const downloadURL = wrapperResources(serverPath, file.path); // 下载文件到 destFile await downloadFile(downloadURL, destFile); const etag = nanoid(); newFiles.push({ name: file.name, path: destFile.replace(fileStore, '') + '||' + etag, }); } if (!hasIndexHtml) { newFiles.push({ name: 'index.html', path: path.join(uploadFiles, 'index.html'), }); fs.writeFileSync(path.join(uploadFiles, 'index.html'), JSON.stringify(files), { encoding: 'utf-8', }); } } return { ...rest, data: { ...dataRest, files: newFiles, }, }; }; export const checkFileExistsSync = (filePath: string) => { try { // 使用 F_OK 检查文件或目录是否存在 fs.accessSync(filePath, fs.constants.F_OK); return true; } catch (err) { return false; } }; export const deleteUserAppFiles = async (user: string, app: string) => { const uploadFiles = path.join(fileStore, user, app); try { fs.rmSync(uploadFiles, { recursive: true }); } catch (err) { if (err.code === 'ENOENT') { // 文件不存在 } else { console.error('deleteUserAppFiles', err); } } // console.log('deleteUserAppFiles', res); }; async function downloadFile(fileUrl: string, destFile: string) { if (fileUrl.startsWith(minioResources)) { await downloadFileFromMinio(fileUrl, destFile); return; } console.log('destFile', destFile, 'fileUrl', fileUrl); const res = await fetch(fileUrl); if (!res.ok) { throw new Error(`Failed to fetch ${fileUrl}: ${res.statusText}`); } const destStream = fs.createWriteStream(destFile); // 使用 `pipeline` 将 `res.body` 中的数据传递给 `destStream` await pipelineAsync(res.body, destStream); console.log(`File downloaded to ${destFile}`); } export const clearAllUserApp = async () => { // redis 删除 所有的 user:app:* const keys = await redis.keys('user:app:*'); console.log('clearAllUserApp', keys); if (keys.length > 0) { const pipeline = redis.pipeline(); keys.forEach((key) => pipeline.del(key)); // 将每个键的删除操作添加到 pipeline 中 await pipeline.exec(); // 执行 pipeline 中的所有命令 console.log('All keys deleted successfully using pipeline'); } }; export const setEtag = async (fileContent: string) => { const eTag = crypto.createHash('md5').update(fileContent).digest('hex'); return eTag; }; // redis 监听 user:app:exist:*的过期 subscriber.on('ready', () => { console.log('Subscriber is ready and connected.'); }); // 订阅 Redis 频道 subscriber.subscribe('__keyevent@0__:expired', (err, count) => { if (err) { console.error('Failed to subscribe: ', err); } else { console.log(`Subscribed to ${count} channel(s). Waiting for expired events...`); } }); // 监听消息事件 subscriber.on('message', (channel, message) => { // 检查是否匹配 user:app:exist:* 模式 if (message.startsWith('user:app:exist:')) { const [_user, _app, _exist, app, user] = message.split(':'); // 在这里执行你的逻辑,例如清理缓存或通知用户 console.log('User app exist key expired:', app, user); const userApp = new UserApp({ user, app }); userApp.clearCacheData(); } });