This commit is contained in:
2026-01-17 14:48:49 +08:00
parent b9b4c993f4
commit 0b5a0557ee
14 changed files with 613 additions and 233 deletions

View File

@@ -0,0 +1,32 @@
import { simpleRouter, clients, getTaskId, writeEvents, deleteOldClients, error } from './router.ts';
simpleRouter.get('/client/events', async (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
const taskId = getTaskId(req);
if (!taskId) {
res.end(error('task-id is required'));
return;
}
// 将客户端连接推送到 clients 数组
clients.set(taskId, { client: res, createTime: Date.now() });
writeEvents(req, { progress: 0, message: 'start' })
// 移除客户端连接
req.on('close', () => {
clients.delete(taskId);
});
});
simpleRouter.get('/client/events/close', async (req, res) => {
const taskId = getTaskId(req);
if (!taskId) {
res.end(error('task-id is required'));
return;
}
deleteOldClients();
clients.delete(taskId);
res.end('ok');
});

View File

@@ -1,216 +1 @@
// import Busboy from 'busboy';
// import { simpleRouter } from '../app.ts'
// import http from 'http';
// import path from 'path';
// import { createWriteStream } from 'fs';
// import { checkAuth } from '@/routes/index.ts';
// import { pipeBusboy } from '@/module/assistant/proxy/pipe.ts';
// simpleRouter.get('/client/upload', async (req, res) => {
// if (res.headersSent) return; // 如果响应已发送,不再处理
// res.writeHead(200, { 'Content-Type': 'application/json' });
// res.end(JSON.stringify({ message: 'Upload endpoint reached' }));
// })
// export const error = (msg: string, code = 500) => {
// return JSON.stringify({ code, message: msg });
// };
// export const parseIfJson = (data = '{}') => {
// try {
// const _data = JSON.parse(data);
// if (typeof _data === 'object') return _data;
// return {};
// } catch (error) {
// return {};
// }
// };
// const uploadResources = async (req: http.IncomingMessage, res: http.ServerResponse) => {
// const { tokenUser, token } = await checkAuth(req, res);
// if (!tokenUser) {
// res.end(error('Token is invalid.'));
// return;
// }
// const url = new URL(req.url || '', 'http://localhost');
// const share = !!url.searchParams.get('public');
// const meta = parseIfJson(url.searchParams.get('meta'));
// const noCheckAppFiles = !!url.searchParams.get('noCheckAppFiles');
// // 使用 busboy 解析 multipart/form-data
// const busboy = Busboy({ headers: req.headers, preservePath: true });
// const fields: any = {};
// const files: any[] = [];
// const filePromises: Promise<void>[] = [];
// let bytesReceived = 0;
// let bytesExpected = parseInt(req.headers['content-length'] || '0');
// busboy.on('field', (fieldname, value) => {
// fields[fieldname] = value;
// });
// busboy.on('file', (fieldname, fileStream, info) => {
// const { filename, encoding, mimeType } = info;
// const tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`);
// const writeStream = createWriteStream(tempPath);
// const filePromise = new Promise<void>((resolve, reject) => {
// fileStream.on('data', (chunk) => {
// bytesReceived += chunk.length;
// if (bytesExpected > 0) {
// const progress = (bytesReceived / bytesExpected) * 100;
// const data = {
// progress: progress.toFixed(2),
// message: `Upload progress: ${progress.toFixed(2)}%`,
// };
// console.log('progress-upload', data);
// writeEvents(req, data);
// }
// });
// fileStream.pipe(writeStream);
// writeStream.on('finish', () => {
// files.push({
// filepath: tempPath,
// originalFilename: filename,
// mimetype: mimeType,
// });
// resolve();
// });
// writeStream.on('error', (err) => {
// reject(err);
// });
// });
// filePromises.push(filePromise);
// });
// busboy.on('finish', async () => {
// // 等待所有文件写入完成
// try {
// await Promise.all(filePromises);
// } catch (err) {
// logger.error(`File write error: ${err.message}`);
// res.end(error(`File write error: ${err.message}`));
// return;
// }
// const clearFiles = () => {
// files.forEach((file) => {
// if (file?.filepath && fs.existsSync(file.filepath)) {
// fs.unlinkSync(file.filepath);
// }
// });
// };
// // 检查是否有文件上传
// if (files.length === 0) {
// res.end(error('files is required'));
// return;
// }
// let { appKey, version, username, directory, description } = getKey(fields, ['appKey', 'version', 'username', 'directory', 'description']);
// let uid = tokenUser.id;
// if (username) {
// const user = await User.getUserByToken(token);
// const has = await user.hasUser(username, true);
// if (!has) {
// res.end(error('username is not found'));
// clearFiles();
// return;
// }
// const _user = await User.findOne({ where: { username } });
// uid = _user?.id || '';
// }
// if (!appKey || !version) {
// const config = await ConfigModel.getUploadConfig({ uid });
// if (config) {
// appKey = config.config?.data?.key || '';
// version = config.config?.data?.version || '';
// }
// }
// if (!appKey || !version) {
// res.end(error('appKey or version is not found, please check the upload config.'));
// clearFiles();
// return;
// }
// const { code, message } = validateDirectory(directory);
// if (code !== 200) {
// res.end(error(message));
// clearFiles();
// return;
// }
// // 逐个处理每个上传的文件
// const uploadedFiles = files;
// logger.info(
// 'upload files',
// uploadedFiles.map((item) => {
// return pick(item, ['filepath', 'originalFilename']);
// }),
// );
// const uploadResults = [];
// for (let i = 0; i < uploadedFiles.length; i++) {
// const file = uploadedFiles[i];
// // @ts-ignore
// const tempPath = file.filepath; // 文件上传时的临时路径
// const relativePath = file.originalFilename; // 保留表单中上传的文件名 (包含文件夹结构)
// // 比如 child2/b.txt
// const minioPath = `${username || tokenUser.username}/${appKey}/${version}${directory ? `/${directory}` : ''}/${relativePath}`;
// // 上传到 MinIO 并保留文件夹结构
// const isHTML = relativePath.endsWith('.html');
// const metadata: any = {};
// if (share) {
// metadata.share = 'public';
// }
// Object.assign(metadata, meta);
// await minioClient.fPutObject(bucketName, minioPath, tempPath, {
// 'Content-Type': getContentType(relativePath),
// 'app-source': 'user-app',
// 'Cache-Control': isHTML ? 'no-cache' : 'max-age=31536000, immutable', // 缓存一年
// ...metadata,
// });
// uploadResults.push({
// name: relativePath,
// path: minioPath,
// });
// fs.unlinkSync(tempPath); // 删除临时文件
// }
// if (!noCheckAppFiles) {
// const _data = { appKey, version, username, files: uploadResults, description, }
// if (_data.description) {
// delete _data.description;
// }
// // 受控
// const r = await app.call({
// path: 'app',
// key: 'uploadFiles',
// payload: {
// token: token,
// data: _data,
// },
// });
// const data: any = {
// code: r.code,
// data: {
// app: r.body,
// upload: uploadResults,
// },
// };
// if (r.message) {
// data.message = r.message;
// }
// console.log('upload data', data);
// res.writeHead(200, { 'Content-Type': 'application/json' });
// res.end(JSON.stringify(data));
// } else {
// res.writeHead(200, { 'Content-Type': 'application/json' });
// res.end(
// JSON.stringify({
// code: 200,
// data: {
// detect: [],
// upload: uploadResults,
// },
// }),
// );
// }
// });
// pipeBusboy(req, res, busboy);
// }
export * from './upload.ts'

View File

@@ -0,0 +1,87 @@
import { simpleRouter } from '@/app.ts';
import http from 'http';
import { useContextKey } from '@kevisual/context';
import { useFileStore } from '@kevisual/use-config';
export { simpleRouter };
export const cacheFilePath = useFileStore('cache-file', { needExists: true });
/**
* 事件客户端
*/
const eventClientsInit = () => {
const clients = new Map<string, { client?: http.ServerResponse; createTime?: number;[key: string]: any }>();
return clients;
};
export const clients = useContextKey('event-clients', () => eventClientsInit());
/**
* 获取 task-id
* @param req
* @returns
*/
export const getTaskId = (req: http.IncomingMessage) => {
const url = new URL(req.url || '', 'http://localhost');
const taskId = url.searchParams.get('taskId');
if (taskId) {
return taskId;
}
return req.headers['task-id'] as string;
};
type EventData = {
progress: number | string;
message: string;
};
/**
* 写入事件
* @param req
* @param data
*/
export const writeEvents = (req: http.IncomingMessage, data: EventData) => {
const taskId = getTaskId(req);
if (taskId) {
const client = clients.get(taskId)?.client;
if (client) {
client.write(`data: ${JSON.stringify(data)}\n\n`);
}
if (Number(data.progress) === 100) {
clients.delete(taskId);
}
} else {
console.log('taskId is remove.', taskId);
}
};
/**
* 查找超出2个小时的clients都删除了
*/
export const deleteOldClients = () => {
const now = Date.now();
for (const [taskId, client] of clients) {
// 如果创建时间超过2个小时则删除
if (now - client.createTime > 1000 * 60 * 60 * 2) {
clients.delete(taskId);
}
}
};
/**
* 解析表单数据, 如果表单数据是数组, 则取第一个appKey, version, username 等
* @param fields 表单数据
* @param parseKeys 需要解析的键
* @returns 解析后的数据
*/
export const getKey = (fields: Record<string, any>, parseKeys: string[]) => {
let value: Record<string, any> = {};
for (const key of parseKeys) {
const v = fields[key];
if (Array.isArray(v)) {
value[key] = v[0];
} else {
value[key] = v;
}
}
return value;
};
export const error = (msg: string, code = 500) => {
return JSON.stringify({ code, message: msg });
};

View File

@@ -0,0 +1,179 @@
import Busboy from 'busboy';
import { assistantConfig, simpleRouter } from '../app.ts'
import http from 'http';
import path from 'path';
import fs from 'fs';
import { checkAuth } from '@/routes/index.ts';
import { getTokenFromRequest } from '@/module/get-header-token.ts';
import { pipeBusboy } from '@/module/assistant/proxy/pipe.ts';
import { logger } from '@/module/logger.ts';
import { cacheFilePath, getKey, writeEvents } from './router.ts';
import { getContentType } from '@kevisual/oss/services';
import { validateDirectory } from './utils.ts';
import { UploadManager } from '@/module/upload/mv.ts';
simpleRouter.get('/client/upload', async (req, res) => {
if (res.headersSent) return; // 如果响应已发送,不再处理
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ message: 'Upload endpoint reached' }));
})
export const error = (msg: string, code = 500) => {
return JSON.stringify({ code, message: msg });
};
export const parseIfJson = (data = '{}') => {
try {
const _data = JSON.parse(data);
if (typeof _data === 'object') return _data;
return {};
} catch (error) {
return {};
}
};
const uploadManager = new UploadManager(assistantConfig)
export const uploadResources = async (req: http.IncomingMessage, res: http.ServerResponse) => {
// const { tokenUser, token } = await checkAuth(req, res);
const token = getTokenFromRequest(req);
let tokenUser: any = null;
const authResult = await checkAuth({ query: { token } });
if (authResult.code === 200) {
tokenUser = authResult.data?.tokenUser;
} else {
res.end(error(authResult.message, authResult.code));
return;
}
if (!tokenUser) {
res.end(error('Token is invalid.'));
return;
}
const url = new URL(req.url || '', 'http://localhost');
const share = !!url.searchParams.get('public');
const meta = parseIfJson(url.searchParams.get('meta'));
// 使用 busboy 解析 multipart/form-data
const busboy = Busboy({ headers: req.headers, preservePath: true });
const fields: any = {};
const files: any[] = [];
const filePromises: Promise<void>[] = [];
let bytesReceived = 0;
let bytesExpected = parseInt(req.headers['content-length'] || '0');
busboy.on('field', (fieldname, value) => {
fields[fieldname] = value;
});
busboy.on('file', (fieldname, fileStream, info) => {
const { filename, encoding, mimeType } = info;
const tempPath = path.join(cacheFilePath, `${Date.now()}-${Math.random().toString(36).substring(7)}`);
const writeStream = fs.createWriteStream(tempPath);
const filePromise = new Promise<void>((resolve, reject) => {
fileStream.on('data', (chunk) => {
bytesReceived += chunk.length;
if (bytesExpected > 0) {
const progress = (bytesReceived / bytesExpected) * 100;
const data = {
progress: progress.toFixed(2),
message: `Upload progress: ${progress.toFixed(2)}%`,
};
console.log('progress-upload', JSON.stringify(data, null, 2));
writeEvents(req, data);
}
});
fileStream.pipe(writeStream);
writeStream.on('finish', () => {
files.push({
filepath: tempPath,
originalFilename: filename,
mimetype: mimeType,
});
resolve();
});
writeStream.on('error', (err) => {
reject(err);
});
});
filePromises.push(filePromise);
});
busboy.on('finish', async () => {
// 等待所有文件写入完成
try {
await Promise.all(filePromises);
} catch (err) {
logger.error(`File write error: ${err.message}`);
res.end(error(`File write error: ${err.message}`));
return;
}
const clearFiles = () => {
files.forEach((file) => {
if (file?.filepath && fs.existsSync(file.filepath)) {
fs.unlinkSync(file.filepath);
}
});
};
// 检查是否有文件上传
if (files.length === 0) {
res.end(error('files is required'));
return;
}
console.log('fields', fields);
let { appKey, version, username, directory } = getKey(fields, ['appKey', 'version', 'username', 'directory']);
if (!appKey || !version) {
res.end(error('appKey or version is not found, please check the upload config.'));
clearFiles();
return;
}
const { code, message } = validateDirectory(directory);
if (code !== 200) {
res.end(error(message));
clearFiles();
return;
}
// 逐个处理每个上传的文件
const uploadedFiles = files;
const uploadResults = [];
for (let i = 0; i < uploadedFiles.length; i++) {
const file = uploadedFiles[i];
// @ts-ignore
const tempPath = file.filepath; // 文件上传时的临时路径
const relativePath = file.originalFilename; // 保留表单中上传的文件名 (包含文件夹结构)
// 比如 child2/b.txt
const showVersion = false;
const _version = showVersion ? `${version ? '/' + version : ''}` : '';
const _directory = directory ? `/${directory}` : '';
const minioPath = `${username || tokenUser.username || 'unknown'}/${appKey}${_version}${_directory}/${relativePath}`;
uploadResults.push({
name: relativePath,
path: minioPath,
});
const type = 'file';
uploadManager.mvFile({
type: 'file',
temppath: tempPath,
targetPath: minioPath,
})
if (type !== 'file') {
fs.unlinkSync(tempPath); // 删除临时文件
}
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
code: 200,
data: {
detect: [],
upload: uploadResults,
},
}),
);
});
pipeBusboy(req, res, busboy);
}
simpleRouter.post('/client/upload', uploadResources);

View File

@@ -0,0 +1,30 @@
/**
* 校验directory是否合法, 合法返回200, 不合法返回500
*
* directory 不能以/开头,不能以/结尾。不能以.开头,不能以.结尾。
* 把directory的/替换掉后,只能包含数字、字母、下划线、中划线
* @param directory 目录
* @returns
*/
export const validateDirectory = (directory?: string) => {
// 对directory进行校验不能以/开头,不能以/结尾。不能以.开头,不能以.结尾。
if (directory && (directory.startsWith('/') || directory.endsWith('/') || directory.startsWith('..') || directory.endsWith('..'))) {
return {
code: 500,
message: 'directory is invalid',
};
}
// 把directory的/替换掉后,只能包含数字、字母、下划线、中划线
// 可以包含.
let _directory = directory?.replace(/\//g, '');
if (_directory && !/^[a-zA-Z0-9_.-]+$/.test(_directory)) {
return {
code: 500,
message: 'directory is invalid, only number, letter, underline and hyphen are allowed',
};
}
return {
code: 200,
message: 'directory is valid',
};
};