diff --git a/package.json b/package.json index 067fd8a..02eedfa 100644 --- a/package.json +++ b/package.json @@ -72,14 +72,14 @@ }, "devDependencies": { "@aws-sdk/client-s3": "^3.980.0", - "@kevisual/oss": "0.0.18", "@kevisual/code-center-module": "0.0.24", "@kevisual/context": "^0.0.4", "@kevisual/file-listener": "^0.0.2", "@kevisual/local-app-manager": "0.1.32", "@kevisual/logger": "^0.0.4", + "@kevisual/oss": "0.0.18", "@kevisual/permission": "^0.0.3", - "@kevisual/router": "0.0.64", + "@kevisual/router": "0.0.65", "@kevisual/types": "^0.0.12", "@kevisual/use-config": "^1.0.28", "@types/archiver": "^7.0.0", @@ -87,6 +87,7 @@ "@types/crypto-js": "^4.2.2", "@types/jsonwebtoken": "^9.0.10", "@types/node": "^25.1.0", + "@types/pg": "^8.16.0", "@types/semver": "^7.7.1", "@types/xml2js": "^0.4.14", "archiver": "^7.0.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3ce68e6..ceb311f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -37,17 +37,17 @@ importers: specifier: ^1.6.0 version: 1.6.0 commander: - specifier: ^14.0.2 - version: 14.0.2 + specifier: ^14.0.3 + version: 14.0.3 drizzle-kit: specifier: ^0.31.8 version: 0.31.8 drizzle-orm: specifier: ^0.45.1 - version: 0.45.1(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0) + version: 0.45.1(@types/pg@8.16.0)(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0) drizzle-zod: specifier: ^0.8.3 - version: 0.8.3(drizzle-orm@0.45.1(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0))(zod@4.3.6) + version: 0.8.3(drizzle-orm@0.45.1(@types/pg@8.16.0)(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0))(zod@4.3.6) eventemitter3: specifier: ^5.0.4 version: 5.0.4 @@ -104,8 +104,8 @@ importers: specifier: ^0.0.3 version: 0.0.3 '@kevisual/router': - specifier: 0.0.64 - version: 0.0.64(typescript@5.9.3) + specifier: 0.0.65 + version: 0.0.65(typescript@5.9.3) '@kevisual/types': specifier: ^0.0.12 version: 0.0.12 @@ -127,6 +127,9 @@ importers: '@types/node': specifier: ^25.1.0 version: 25.1.0 + '@types/pg': + specifier: ^8.16.0 + version: 8.16.0 '@types/semver': specifier: ^7.7.1 version: 7.7.1 @@ -894,8 +897,8 @@ packages: '@kevisual/router@0.0.60': resolution: {integrity: sha512-2v/ZzUstsaq+Uqo+tZX9ys5E+/2erPggCtljv9jTb3NA88ZdHsYUAsd5wUFvLtf9QucpJCzyWEt+InDV/98FKw==} - '@kevisual/router@0.0.64': - resolution: {integrity: sha512-EYz1MZxrltgySUL0Y+/MtZf2FEmqC5U8GmFAqvHNjgtS5FJdHpxRjo6zab4+0wSUlVyCxCpZXFY5vHB/g+nQBw==} + '@kevisual/router@0.0.65': + resolution: {integrity: sha512-UiGqjLWheDbWOhEBBOSggCnafYFz3tCjLZYDp44ahiyeC2APwFRozz7UYbEq7+amH4Ex1wdqk1AlKmuP7w04og==} '@kevisual/types@0.0.12': resolution: {integrity: sha512-zJXH2dosir3jVrQ6QG4i0+iLQeT9gJ3H+cKXs8ReWboxBSYzUZO78XssVeVrFPsJ33iaAqo4q3DWbSS1dWGn7Q==} @@ -1408,6 +1411,9 @@ packages: '@types/node@25.1.0': resolution: {integrity: sha512-t7frlewr6+cbx+9Ohpl0NOTKXZNV9xHRmNOvql47BFJKcEG1CxtxlPEEe+gR9uhVWM4DwhnvTF110mIL4yP9RA==} + '@types/pg@8.16.0': + resolution: {integrity: sha512-RmhMd/wD+CF8Dfo+cVIy3RR5cl8CyfXQ0tGgW6XBL8L4LM/UTEbNXYRbLwU6w+CgrKBNbrQWt4FUtTfaU5jSYQ==} + '@types/readdir-glob@1.1.5': resolution: {integrity: sha512-raiuEPUYqXu+nvtY2Pe8s8FEmZ3x5yAH4VkLdihcPdalvsHltomrRC9BzuStrJ9yk06470hS0Crw0f1pXqD+Hg==} @@ -1621,8 +1627,8 @@ packages: color-name@1.1.4: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} - commander@14.0.2: - resolution: {integrity: sha512-TywoWNNRbhoD0BXs1P3ZEScW8W5iKrnbithIl0YH+uCmBd0QpPOA8yc82DS3BIE5Ma6FnBVUsJ7wVUDz4dvOWQ==} + commander@14.0.3: + resolution: {integrity: sha512-H+y0Jo/T1RZ9qPP4Eh1pkcQcLRglraJaSLoyOtHxu6AapkjWVCy2Sit1QQ4x3Dng8qDlSsZEet7g5Pq06MvTgw==} engines: {node: '>=20'} commander@2.15.1: @@ -3995,7 +4001,7 @@ snapshots: dependencies: hono: 4.11.5 - '@kevisual/router@0.0.64(typescript@5.9.3)': + '@kevisual/router@0.0.65(typescript@5.9.3)': dependencies: '@kevisual/dts': 0.0.3(typescript@5.9.3) hono: 4.11.7 @@ -4591,6 +4597,12 @@ snapshots: dependencies: undici-types: 7.16.0 + '@types/pg@8.16.0': + dependencies: + '@types/node': 25.1.0 + pg-protocol: 1.11.0 + pg-types: 2.2.0 + '@types/readdir-glob@1.1.5': dependencies: '@types/node': 25.1.0 @@ -4824,7 +4836,7 @@ snapshots: color-name@1.1.4: {} - commander@14.0.2: {} + commander@14.0.3: {} commander@2.15.1: {} @@ -4951,15 +4963,16 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.45.1(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0): + drizzle-orm@0.45.1(@types/pg@8.16.0)(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0): optionalDependencies: + '@types/pg': 8.16.0 better-sqlite3: 12.6.2 bun-types: 1.3.8 pg: 8.18.0 - drizzle-zod@0.8.3(drizzle-orm@0.45.1(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0))(zod@4.3.6): + drizzle-zod@0.8.3(drizzle-orm@0.45.1(@types/pg@8.16.0)(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0))(zod@4.3.6): dependencies: - drizzle-orm: 0.45.1(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0) + drizzle-orm: 0.45.1(@types/pg@8.16.0)(better-sqlite3@12.6.2)(bun-types@1.3.8)(pg@8.18.0) zod: 4.3.6 eastasianwidth@0.2.0: {} diff --git a/src/app.ts b/src/app.ts index 2043359..e9d32a3 100644 --- a/src/app.ts +++ b/src/app.ts @@ -6,8 +6,8 @@ import { SimpleRouter } from '@kevisual/router/simple'; import { s3Client, oss as s3Oss } from './modules/s3.ts'; import { BailianProvider } from '@kevisual/ai'; import * as schema from './db/schema.ts'; -import { drizzle } from 'drizzle-orm/node-postgres'; import { config } from './modules/config.ts' +import { db } from './modules/db.ts' export const router = useContextKey('router', () => new SimpleRouter()); export const runtime = useContextKey('runtime', () => { return { @@ -23,10 +23,7 @@ export { s3Client } export const redis = useContextKey('redis', () => redisLib.redis); export const subscriber = useContextKey('subscriber', () => redisLib.subscriber); export const sequelize = useContextKey('sequelize', () => sequelizeLib.sequelize); -export const db = useContextKey('db', () => { - const db = drizzle(config.DATABASE_URL || ''); - return db; -}) +export { db }; const init = () => { return new App({ serverOptions: { diff --git a/src/auth/models/user.ts b/src/auth/models/user.ts index 1304f23..388f4bd 100644 --- a/src/auth/models/user.ts +++ b/src/auth/models/user.ts @@ -183,7 +183,7 @@ export class User extends Model { avatar: this.avatar, orgs, }; - if(this.data?.canChangeUsername) { + if (this.data?.canChangeUsername) { info.canChangeUsername = this.data.canChangeUsername; } const tokenUser = this.tokenUser; @@ -232,6 +232,17 @@ export class User extends Model { async expireOrgs() { await redis.del(`user:${this.id}:orgs`); } + static async getUserNameById(id: string) { + const redisName = await redis.get(`user:id:${id}:name`); + if (redisName) { + return redisName; + } + const user = await User.findByPk(id); + if (user?.username) { + await redis.set(`user:id:${id}:name`, user.username, 'EX', 60 * 60); // 1 hour + } + return user?.username; + } } export type SyncOpts = { alter?: boolean; diff --git a/src/db/drizzle/schema.ts b/src/db/drizzle/schema.ts index f713829..9425aca 100644 --- a/src/db/drizzle/schema.ts +++ b/src/db/drizzle/schema.ts @@ -523,6 +523,7 @@ export const flowmeChannels = pgTable("flowme_channels", { id: uuid().primaryKey().notNull().defaultRandom(), uid: uuid(), title: text('title').default(''), + key: text('key').default(''), description: text('description').default(''), tags: jsonb().default([]), link: text('link').default(''), @@ -532,5 +533,6 @@ export const flowmeChannels = pgTable("flowme_channels", { updatedAt: timestamp('updatedAt').notNull().defaultNow(), }, (table) => [ index('flowme_channels_uid_idx').using('btree', table.uid.asc().nullsLast()), + index('flowme_channels_key_idx').using('btree', table.key.asc().nullsLast()), index('flowme_channels_title_idx').using('btree', table.title.asc().nullsLast()), ]); \ No newline at end of file diff --git a/src/modules/db.ts b/src/modules/db.ts new file mode 100644 index 0000000..fda7bdf --- /dev/null +++ b/src/modules/db.ts @@ -0,0 +1,8 @@ +import { useContextKey } from '@kevisual/context'; +import { drizzle } from 'drizzle-orm/node-postgres'; +import { config } from './config.ts' + +export const db = useContextKey('db', () => { + const db = drizzle(config.DATABASE_URL || ''); + return db; +}) \ No newline at end of file diff --git a/src/modules/v3/index.ts b/src/modules/v3/index.ts new file mode 100644 index 0000000..82a90d8 --- /dev/null +++ b/src/modules/v3/index.ts @@ -0,0 +1,97 @@ +import { IncomingMessage, ServerResponse } from 'http'; +import { App } from '@kevisual/router'; +import { logger } from '../logger.ts'; +// import { getLoginUser } from '@/modules/auth.ts'; +import { SSEManager } from './sse/sse-manager.ts'; +import { getLoginUser } from '../auth.ts'; +import { emitter, flowme_insert } from '../../realtime/flowme/index.ts'; +export const sseManager = new SSEManager(); +emitter.on(flowme_insert, (data) => { + console.log('flowme_insert event received:', data); + const uid = data.uid; + if (uid) { + sseManager.broadcast({ type: 'flowme_insert', data }, { userId: uid }); + } +}); +type ProxyOptions = { + createNotFoundPage: (msg?: string) => any; +}; +export const UserV3Proxy = async (req: IncomingMessage, res: ServerResponse, opts?: ProxyOptions) => { + const { url } = req; + const _url = new URL(url || '', `http://localhost`); + const { pathname, searchParams } = _url; + let [user, app, ...rest] = pathname.split('/').slice(1); + if (!user || !app) { + opts?.createNotFoundPage?.('应用未找到'); + return false; + } + const last = rest.slice(-1)[0] || ''; + const method = req.method || 'GET'; + console.log('UserV3Proxy request: last', last, rest); + if (method === 'GET' && last === 'event') { + const info = await getLoginUser(req); + if (!info) { + opts?.createNotFoundPage?.('没有登录'); + return false; + } + console.log('建立 SSE 连接, user=', info.tokenUser.uid); + addEventStream(req, res, info); + return true; + } + res.end(`UserV3Proxy: user=${user}, app=${app}, rest=${rest.join('/')}`); + console.log('UserV3Proxy:', { user, app, }); + return true; +}; + +type Opts = { + tokenUser: any; + token: string; +} +const addEventStream = (req: IncomingMessage, res: ServerResponse, opts?: Opts) => { + res.writeHead(200, { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'Access-Control-Allow-Origin': '*' + }); + console.log('Client connected for SSE', opts?.tokenUser?.username || 'unknown'); + const uid = opts?.tokenUser?.id || 'guest'; + console.log('SSE for userId=', opts?.tokenUser); + const connectionInfo = sseManager.createConnection({ userId: uid }); + const { stream, id: connectionId } = connectionInfo; + // 设置心跳 + connectionInfo.heartbeatInterval = setInterval(() => { + sseManager.sendToConnection(connectionId, { type: "heartbeat", timestamp: Date.now() }) + .catch(() => { + // 心跳失败时清理连接 + sseManager.closeConnection(connectionId); + }); + }, 30000); // 30秒心跳 + + const timer = setInterval(async () => { + sseManager.broadcast({ type: "time", timestamp: Date.now() }); + const hasId = sseManager.getConnection(connectionId); + if (!hasId) { + clearInterval(timer); + console.log('清理广播定时器,连接已关闭'); + } + }, 1000); + + res.pipe(stream as any); + const bun = (req as any).bun + const request = bun?.request as Bun.BunRequest + if (request) { + if (request.signal) { + // 当客户端断开时清理连接 + request.signal.addEventListener("abort", () => { + console.log(`Client ${connectionId} disconnected`); + sseManager.closeConnection(connectionId); + }); + } + } + + // console.log('res', req) + // res.end('123'); +} + +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); \ No newline at end of file diff --git a/src/modules/v3/sse/sse-manager.ts b/src/modules/v3/sse/sse-manager.ts new file mode 100644 index 0000000..24ceda6 --- /dev/null +++ b/src/modules/v3/sse/sse-manager.ts @@ -0,0 +1,134 @@ +import { nanoid } from "nanoid"; +type ConnectionInfo = { + id: string; + writer: WritableStreamDefaultWriter; + stream: ReadableStream; + connectedAt: Date; + heartbeatInterval: NodeJS.Timeout | null; + userId?: string; +}; +export class SSEManager { + private connections: Map = new Map(); + private userConnections: Map> = new Map(); // userId -> connectionIds + + constructor() { + // 初始化逻辑 + } + createConnection(info?: { userId?: string }): ConnectionInfo { + const connectionId = nanoid(16); + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + + // 存储连接信息 + const connectionInfo = { + id: connectionId, + writer, + stream: readable, + connectedAt: new Date(), + heartbeatInterval: null, + userId: info?.userId + }; + + this.connections.set(connectionId, connectionInfo); + + // 添加到用户索引 + if (info?.userId) { + const userSet = this.userConnections.get(info.userId) || new Set(); + userSet.add(connectionId); + this.userConnections.set(info.userId, userSet); + } + + return connectionInfo; + } + + sendToConnection(connectionId: string, data: any) { + const connection = this.connections.get(connectionId); + if (connection) { + const message = `data: ${JSON.stringify(data)}\n\n`; + return connection.writer.write(new TextEncoder().encode(message)); + } + throw new Error(`Connection ${connectionId} not found`); + } + + getConnection(connectionId: string) { + return this.connections.get(connectionId); + } + + broadcast(data: any, opts?: { userId?: string }) { + const message = `data: ${JSON.stringify(data)}\n\n`; + const promises = []; + + // 指定 userId:只发送给目标用户(通过索引快速查找) + if (opts?.userId) { + const userConnIds = this.userConnections.get(opts.userId); + if (userConnIds) { + for (const connId of userConnIds) { + const conn = this.connections.get(connId); + if (conn) { + promises.push( + conn.writer.write(new TextEncoder().encode(message)) + .catch(() => { + this.closeConnection(connId); + }) + ); + } + } + } + return Promise.all(promises); + } + + // 未指定 userId:广播给所有人 + for (const [id, connection] of this.connections) { + promises.push( + connection.writer.write(new TextEncoder().encode(message)) + .catch(() => { + this.closeConnection(id); + }) + ); + } + + return Promise.all(promises); + } + + closeConnection(connectionId: string) { + const connection = this.connections.get(connectionId); + if (connection) { + // 清理心跳定时器 + if (connection.heartbeatInterval) { + clearInterval(connection.heartbeatInterval); + } + + // 从用户索引中移除 + if (connection.userId) { + const userSet = this.userConnections.get(connection.userId); + if (userSet) { + userSet.delete(connectionId); + if (userSet.size === 0) { + this.userConnections.delete(connection.userId); + } + } + } + + // 关闭写入器 + connection.writer.close().catch(console.error); + + // 从管理器中移除 + this.connections.delete(connectionId); + + console.log(`Connection ${connectionId} closed`); + return true; + } + return false; + } + + closeAllConnections() { + for (const [connectionId, connection] of this.connections) { + this.closeConnection(connectionId); + } + } + + getActiveConnections() { + return Array.from(this.connections.keys()); + } +} + diff --git a/src/realtime/flowme/common.ts b/src/realtime/flowme/common.ts new file mode 100644 index 0000000..80ec0ec --- /dev/null +++ b/src/realtime/flowme/common.ts @@ -0,0 +1 @@ +export const flowme_insert = 'flowme_insert' \ No newline at end of file diff --git a/src/realtime/flowme/create.ts b/src/realtime/flowme/create.ts new file mode 100644 index 0000000..1852e9e --- /dev/null +++ b/src/realtime/flowme/create.ts @@ -0,0 +1,26 @@ +import { db } from '@/modules/db.ts' + +// 创建触发器函数和触发器,用于在 flowme 表插入新记录时发送通知 +const sql = `CREATE OR REPLACE FUNCTION notify_flowme_insert() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify('flowme_insert', row_to_json(NEW)::text); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER flowme_after_insert + AFTER INSERT ON flowme + FOR EACH ROW + EXECUTE FUNCTION notify_flowme_insert(); +`; + +if (import.meta.main) { + const result = await db.execute(sql) + console.log('✅ flowme 插入触发器已创建或更新:', result) +} + +// const listFunSql = `SELECT proname FROM pg_proc WHERE proname = 'flowme_after_insert';` + +// const funExists = await db.execute(listFunSql) +// console.log('函数是否存在:', funExists) \ No newline at end of file diff --git a/src/realtime/flowme/index.ts b/src/realtime/flowme/index.ts new file mode 100644 index 0000000..c0ca554 --- /dev/null +++ b/src/realtime/flowme/index.ts @@ -0,0 +1,3 @@ +export * from './listener.ts' + +export * from './common.ts' \ No newline at end of file diff --git a/src/realtime/flowme/listener.ts b/src/realtime/flowme/listener.ts new file mode 100644 index 0000000..41e1d29 --- /dev/null +++ b/src/realtime/flowme/listener.ts @@ -0,0 +1,33 @@ +import { Client } from 'pg' +import { useConfig } from '@kevisual/use-config' +import { EventEmitter } from 'eventemitter3' +const config = useConfig() +let pgClient: Client | null = null +export const emitter = new EventEmitter() + +async function startFlowmeListener() { + // 使用独立的数据库连接来监听 + pgClient = new Client({ + connectionString: config.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/code_center', + }) + console.log('config.DATABASE_URL =', config.DATABASE_URL) + + await pgClient.connect() + console.log('🔌 已连接到 PostgreSQL 监听器') + // 订阅通知事件 + pgClient.on('notification', (data) => { + if (!data.payload) return + try { + const parsed = JSON.parse(data.payload) + emitter.emit('flowme_insert', parsed) + } catch (err) { + console.error('❌ 解析 flowme 通知失败:', err) + } + }) + + // 执行 LISTEN 命令订阅通道 + await pgClient.query('LISTEN flowme_insert') + + console.log('👂 开始监听 flowme_insert 通道...') +} +startFlowmeListener(); \ No newline at end of file diff --git a/src/routes-simple/handle-request.ts b/src/routes-simple/handle-request.ts index 9bd7bd4..4e851bb 100644 --- a/src/routes-simple/handle-request.ts +++ b/src/routes-simple/handle-request.ts @@ -9,205 +9,13 @@ import { User } from '@/models/user.ts'; import { router, error, checkAuth, writeEvents } from './router.ts'; import './index.ts'; import { handleRequest as PageProxy } from './page-proxy.ts'; -import path from 'path'; -import { createWriteStream } from 'fs'; -import { pipeBusboy } from '@/modules/fm-manager/pipe-busboy.ts'; -const cacheFilePath = useFileStore('cache-file', { needExists: true }); -router.get('/api/app/upload', async (req, res) => { - res.writeHead(200, { 'Content-Type': 'text/plain' }); - res.end('Upload API is ready'); -}); - -router.post('/api/app/upload', async (req, res) => { - if (res.headersSent) return; // 如果响应已发送,不再处理 - res.writeHead(200, { 'Content-Type': 'application/json' }); - const { tokenUser, token } = await checkAuth(req, res); - if (!tokenUser) return; - - // 使用 busboy 解析 multipart/form-data - const busboy = Busboy({ headers: req.headers, preservePath: true, defCharset: 'utf-8' }); - const fields: any = {}; - const files: any = []; - const filePromises: Promise[] = []; - let bytesReceived = 0; - let bytesExpected = parseInt(req.headers['content-length'] || '0'); - - busboy.on('field', (fieldname, value) => { - fields[fieldname] = value; - }); - - busboy.on('file', (fieldname, fileStream, info) => { - const { filename, encoding, mimeType } = info; - // 处理 UTF-8 文件名编码 - const decodedFilename = typeof filename === 'string' ? Buffer.from(filename, 'latin1').toString('utf8') : filename; - const tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`); - const writeStream = createWriteStream(tempPath); - - const filePromise = new Promise((resolve, reject) => { - fileStream.on('data', (chunk) => { - bytesReceived += chunk.length; - if (bytesExpected > 0) { - const progress = (bytesReceived / bytesExpected) * 100; - console.log(`Upload progress: ${progress.toFixed(2)}%`); - const data = { - progress: progress.toFixed(2), - message: `Upload progress: ${progress.toFixed(2)}%`, - }; - writeEvents(req, data); - } - }); - - fileStream.pipe(writeStream); - - writeStream.on('finish', () => { - files.push({ - filepath: tempPath, - originalFilename: decodedFilename, - mimetype: mimeType, - }); - resolve(); - }); - - writeStream.on('error', (err) => { - reject(err); - }); - }); - - filePromises.push(filePromise); - }); - - busboy.on('finish', async () => { - // 等待所有文件写入完成 - try { - await Promise.all(filePromises); - } catch (err) { - console.error(`File write error: ${err.message}`); - res.end(error(`File write error: ${err.message}`)); - return; - } - const clearFiles = () => { - files.forEach((file: any) => { - if (file?.filepath && fs.existsSync(file.filepath)) { - fs.unlinkSync(file.filepath); - } - }); - }; - - // 检查是否有文件上传 - if (files.length === 0) { - res.end(error('files is required')); - return; - } - - let appKey, - version, - username = ''; - const { appKey: _appKey, version: _version, username: _username } = fields; - if (Array.isArray(_appKey)) { - appKey = _appKey?.[0]; - } else { - appKey = _appKey; - } - if (Array.isArray(_version)) { - version = _version?.[0]; - } else { - version = _version; - } - if (Array.isArray(_username)) { - username = _username?.[0]; - } else if (_username) { - username = _username; - } - if (username) { - const user = await User.getUserByToken(token); - const has = await user.hasUser(username, true); - if (!has) { - res.end(error('username is not found')); - clearFiles(); - return; - } - } - if (!appKey) { - res.end(error('appKey is required')); - clearFiles(); - return; - } - if (!version) { - res.end(error('version is required')); - clearFiles(); - return; - } - console.log('Appkey', appKey, version); - - // 逐个处理每个上传的文件 - const uploadResults = []; - for (let i = 0; i < files.length; i++) { - const file = files[i]; - const tempPath = file.filepath; // 文件上传时的临时路径 - const relativePath = file.originalFilename; // 保留表单中上传的文件名 (包含文件夹结构) - // 比如 child2/b.txt - const minioPath = `${username || tokenUser.username}/${appKey}/${version}/${relativePath}`; - // 上传到 MinIO 并保留文件夹结构 - const isHTML = relativePath.endsWith('.html'); - await oss.fPutObject(minioPath, tempPath, { - 'Content-Type': getContentType(relativePath), - 'app-source': 'user-app', - 'Cache-Control': isHTML ? 'no-cache' : 'max-age=31536000, immutable', // 缓存一年 - }); - uploadResults.push({ - name: relativePath, - path: minioPath, - }); - fs.unlinkSync(tempPath); // 删除临时文件 - } - // 受控 - const r = await app.call({ - path: 'app', - key: 'uploadFiles', - payload: { - token: token, - data: { - appKey, - version, - username, - files: uploadResults, - }, - }, - }); - const data: any = { - code: r.code, - data: r.body, - }; - if (r.message) { - data.message = r.message; - } - res.end(JSON.stringify(data)); - }); - - pipeBusboy(req, res, busboy); -}); - -router.all('/api/nocodb-test/router', async (req, res) => { - res.writeHead(200, { 'Content-Type': 'application/json' }); - - const param = await router.getSearch(req); - const body = await router.getBody(req); - - const contentType = req.headers['content-type'] || ''; - console.log('Content-Type:', contentType); - console.log('NocoDB test router called.', req.method, param, JSON.stringify(body, null)); - res.end(JSON.stringify({ message: 'NocoDB test router is working' })); -}); const simpleAppsPrefixs = [ - "/api/app/", "/api/micro-app/", "/api/events", "/api/s1/", - "/api/container/", "/api/resource/", - "/api/wxmsg", - "/api/nocodb-test/" + "/api/wxmsg" ]; diff --git a/src/routes-simple/page-proxy.ts b/src/routes-simple/page-proxy.ts index 91e9c39..0922cf3 100644 --- a/src/routes-simple/page-proxy.ts +++ b/src/routes-simple/page-proxy.ts @@ -13,6 +13,7 @@ import { getLoginUser } from '../modules/auth.ts'; import { rediretHome } from '../modules/user-app/index.ts'; import { logger } from '../modules/logger.ts'; import { UserV1Proxy } from '../modules/ws-proxy/proxy.ts'; +import { UserV3Proxy } from '@/modules/v3/index.ts'; import { hasBadUser, userIsBanned, appIsBanned, userPathIsBanned } from '@/modules/off/index.ts'; import { robotsTxt } from '@/modules/html/index.ts'; import { isBun } from '@/utils/get-engine.ts'; @@ -194,8 +195,8 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR if (!domainApp) { // 原始url地址 const urls = url.split('/'); + const [_, _user, _app] = urls; if (urls.length < 3) { - const [_, _user] = urls; if (_user === 'robots.txt') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end(robotsTxt); @@ -212,8 +213,12 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR forBadUser(req, res); } return res.end(); + } else { + if (userPathIsBanned(_user) || userPathIsBanned(_app)) { + logger.warn(`Bad user access from IP: ${dns.ip}, Host: ${dns.hostName}, URL: ${req.url}`); + return forBadUser(req, res); + } } - const [_, _user, _app] = urls; if (_app && urls.length === 3) { // 重定向到 res.writeHead(302, { Location: `${url}/` }); @@ -250,6 +255,11 @@ export const handleRequest = async (req: http.IncomingMessage, res: http.ServerR createNotFoundPage, }); } + if (user !== 'api' && app === 'v3') { + return UserV3Proxy(req, res, { + createNotFoundPage, + }); + } const userApp = new UserApp({ user, app }); let isExist = await userApp.getExist(); diff --git a/src/routes/flowme/listener/index.ts b/src/routes/flowme/listener/index.ts new file mode 100644 index 0000000..500b4f7 --- /dev/null +++ b/src/routes/flowme/listener/index.ts @@ -0,0 +1,47 @@ +import { schema, app, db } from '@/app.ts' +import { Client } from 'pg' + +let pgClient: Client | null = null + +async function startFlowmeListener() { + // 使用独立的数据库连接来监听 + pgClient = new Client({ + connectionString: process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/code_center', + }) + + await pgClient.connect() + console.log('🔌 已连接到 PostgreSQL 监听器') + + // 订阅通知事件 + pgClient.on('notification', (data) => { + if (!data.payload) return + try { + const parsed = JSON.parse(data.payload) + console.log('📥 收到新 flowme 创建通知:', parsed) + + // 在这里处理你的业务逻辑 + handleNewFlowme(parsed) + } catch (err) { + console.error('❌ 解析 flowme 通知失败:', err) + } + }) + + // 执行 LISTEN 命令订阅通道 + await pgClient.query('LISTEN flowme_insert') + + console.log('👂 开始监听 flowme_insert 通道...') +} + +function handleNewFlowme(data: any) { + // 根据新创建的 flowme 数据执行相应操作 + console.log('处理新 flowme:', data.id, data.title) + + // 示例:可以通过 WebSocket 推送给前端 + // wsServer.emit('flowme:created', data) +} + +// 启动监听器(只启动一次) +if (!global.__flowmeListenerStarted) { + global.__flowmeListenerStarted = true + startFlowmeListener().catch(console.error) +} diff --git a/src/routes/index.ts b/src/routes/index.ts index 14076f8..6ae719f 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -18,4 +18,6 @@ import './prompts/index.ts' import './views/index.ts'; -import './query-views/index.ts'; \ No newline at end of file +import './query-views/index.ts'; + +import './flowme/index.ts' \ No newline at end of file