179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
import time
|
|
import asyncio
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
from playwright.async_api import async_playwright # 改用异步 API
|
|
from dotenv import load_dotenv
|
|
import os
|
|
from typing import Optional, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
|
|
# 加载环境变量
|
|
load_dotenv()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
await initialize_playwright()
|
|
yield
|
|
# Clean up the ML models and release the resources
|
|
# await shutdown_event()
|
|
print("关闭")
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
global_a1 = ""
|
|
# 确保在模块级别声明全局变量
|
|
browser_context = None
|
|
context_page = None
|
|
playwright_instance = None
|
|
|
|
|
|
async def get_context_page(instance, stealth_js_path):
|
|
chromium = instance.chromium
|
|
browser = await chromium.launch(headless=True)
|
|
# browser = await chromium.launch(headless=False)
|
|
context = await browser.new_context()
|
|
await context.add_init_script(path=stealth_js_path)
|
|
page = await context.new_page()
|
|
return context, page
|
|
|
|
|
|
# 初始化 playwright 的异步函数
|
|
async def initialize_playwright():
|
|
global browser_context, context_page, playwright_instance, global_a1
|
|
# 如下更改为 stealth.min.js 文件路径地址
|
|
stealth_js_path = "stealth.min.js"
|
|
print("正在启动 playwright")
|
|
playwright_instance = await async_playwright().start()
|
|
browser_context, context_page = await get_context_page(playwright_instance, stealth_js_path)
|
|
await context_page.goto("https://www.xiaohongshu.com")
|
|
print("正在跳转至小红书首页")
|
|
await asyncio.sleep(5)
|
|
await context_page.reload()
|
|
await asyncio.sleep(1)
|
|
cookies = await browser_context.cookies()
|
|
for cookie in cookies:
|
|
if cookie["name"] == "a1":
|
|
global_a1 = cookie["value"]
|
|
print("当前浏览器 cookie 中 a1 值为:" + cookie["value"] + ",请将需要使用的 a1 设置成一样方可签名成功")
|
|
|
|
loginModal = await context_page.query_selector(".reds-mask")
|
|
if loginModal is not None:
|
|
await loginModal.evaluate("el => el.click()")
|
|
print("登录弹窗已关闭")
|
|
print("跳转小红书首页成功,等待调用")
|
|
|
|
|
|
async def setCookie(a1: str) -> Dict[str, Any]:
|
|
global browser_context, context_page # 声明全局变量
|
|
global global_a1
|
|
try:
|
|
# 确保页面仍然有效,如果页面已关闭则重新初始化
|
|
if context_page is None or await context_page.is_closed():
|
|
browser_context, context_page = await get_context_page(playwright_instance, "stealth.min.js")
|
|
await context_page.goto("https://www.xiaohongshu.com")
|
|
await asyncio.sleep(5)
|
|
await context_page.reload()
|
|
await asyncio.sleep(1)
|
|
if a1 != global_a1:
|
|
# 删除 a1 cookie
|
|
await browser_context.add_cookies([
|
|
{'name': 'a1', 'value': '', 'domain': ".xiaohongshu.com", 'path': "/"}
|
|
])
|
|
# 等待一段时间以确保 cookie 被删除
|
|
await asyncio.sleep(1)
|
|
# 设置新的 a1 cookie
|
|
await browser_context.add_cookies([
|
|
{'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
|
|
])
|
|
await context_page.reload()
|
|
await asyncio.sleep(1)
|
|
cookies = await browser_context.cookies()
|
|
print("页面加载后的 Cookie:", cookies)
|
|
global_a1 = a1
|
|
print("设置 cookie 成功", a1)
|
|
return {"status": "success", "message": "Cookie set successfully"}
|
|
except Exception as e:
|
|
print(f"Error during setCookie operation: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
async def sign(uri: str, data: Dict[str, Any], a1: str, web_session: str) -> Dict[str, Any]:
|
|
global browser_context, context_page # 声明全局变量
|
|
global global_a1
|
|
try:
|
|
# 确保页面仍然有效,如果页面已关闭则重新初始化
|
|
if context_page is None or await context_page.is_closed():
|
|
browser_context, context_page = await get_context_page(playwright_instance, "stealth.min.js")
|
|
await context_page.goto("https://www.xiaohongshu.com")
|
|
await asyncio.sleep(5)
|
|
await context_page.reload()
|
|
await asyncio.sleep(1)
|
|
|
|
if a1 != global_a1:
|
|
await setCookie(a1)
|
|
# 执行 JavaScript 函数
|
|
# localStorage.getItem("b1")
|
|
b1 = await context_page.evaluate("() => localStorage.getItem('b1')")
|
|
b1b1 = await context_page.evaluate("() => localStorage.getItem('b1b1')")
|
|
encrypt_params = await context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
|
|
return {
|
|
"x-s": encrypt_params["X-s"],
|
|
"x-t": str(encrypt_params["X-t"]),
|
|
"b1": b1,
|
|
"b1b1": b1b1,
|
|
}
|
|
except Exception as e:
|
|
print(f"Error during sign operation: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/sign")
|
|
async def sign_endpoint(request: Request):
|
|
json_data = await request.json()
|
|
uri = json_data.get("uri")
|
|
data = json_data.get("data")
|
|
a1 = json_data.get("a1")
|
|
web_session = json_data.get("web_session")
|
|
if not uri or not a1:
|
|
raise HTTPException(status_code=400, detail="Missing required parameters")
|
|
me = await sign(uri, data, a1, web_session)
|
|
return {
|
|
"a1": a1,
|
|
"sign": me,
|
|
}
|
|
|
|
|
|
@app.get("/a1")
|
|
async def get_a1(a1: Optional[str] = None):
|
|
global global_a1
|
|
if a1 and a1 != global_a1:
|
|
# 如果提供了 a1 参数且与当前的 global_a1 不同,则调用 setCookie 函数
|
|
result = await setCookie(a1)
|
|
return {'a1': a1, 'result': result}
|
|
else:
|
|
# 如果没有提供 a1 参数,则返回当前的 global_a1 值
|
|
return {'a1': global_a1}
|
|
|
|
|
|
|
|
async def shutdown_event():
|
|
global playwright_instance, browser_context, context_page
|
|
if context_page:
|
|
await context_page.close()
|
|
if browser_context:
|
|
await browser_context.close()
|
|
if playwright_instance:
|
|
await playwright_instance.stop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import uvicorn
|
|
port = os.getenv("XHS_API_PORT", 5005)
|
|
# 不是数字的话转为 int
|
|
try:
|
|
port = int(port)
|
|
except ValueError:
|
|
print(f"Invalid port number: {port}. Using default port 5005.")
|
|
port = 5005
|
|
# 启动 FastAPI 应用
|
|
uvicorn.run(app, host="0.0.0.0", port=port) |