@@ -1,106 +1,112 @@
import time
from fl ask import Fl ask , r equest
from gevent import monkey
from playwright . sync_api import sync_playwright
import asyncio
from fastapi import FastAPI , R equest, HTTPException
from playwright . async_api import async_playwright # 改用异步 API
from dotenv import load_dotenv
import os
from typing import Optional , Dict , Any
# 加载环境变量
load_dotenv ( )
monkey . patch_all ( )
app = Fl ask ( __name__ )
app = FastAPI ( )
global_a1 = " "
# 确保在模块级别声明全局变量
browser_context = None
context_page = None
playwright_instance = None
def get_context_page ( instance , stealth_js_path ) :
async def get_context_page ( instance , stealth_js_path ) :
chromium = instance . chromium
browser = chromium . launch ( headless = True )
# browser = chromium.launch(headless=False)
context = browser . new_context ( )
context . add_init_script ( path = stealth_js_path )
page = context . new_page ( )
browser = await chromium . launch ( headless = True )
# browser = await chromium.launch(headless=False)
context = await browser . new_context ( )
await context . add_init_script ( path = stealth_js_path )
page = await context . new_page ( )
return context , page
# 如下更改为 stealth.min.js 文件路径地址
stealth_js_path = " stealth.min.js "
print ( " 正在启动 playwright " )
playwright = sync_playwright ( ) . start ( )
browser_context , context _page = get_context_page ( playwright , stealth_js_path )
context_page . goto ( " https://www.xiaohongshu.com " )
print ( " 正在跳转至小红书首页 " )
time . sleep ( 5 )
context_page . reload ( )
time . sleep ( 1 )
cookies = browser_context . cookies ( )
for cookie in cookies :
if cookie [ " name " ] == " a1 " :
global_a1 = cookie [ " value " ]
print ( " 当前浏览器 cookie 中 a1 值为: " + cookie [ " value " ] + " ,请将需要使用的 a1 设置成一样方可签名成功 " )
# 初始化 playwright 的异步函数
async def initialize_playwright ( ) :
global browser_context , context_page , playwright_instance , global_a1
# 如下更改为 stealth.min.js 文件路径地址
stealth_js _path = " stealth.min.js "
print ( " 正在启动 playwright " )
playwright_instance = await async_playwright ( ) . start ( )
browser_context , context_page = await get_context_page ( playwright_instance , stealth_js_path )
await context_page . goto ( " https://www.xiaohongshu.com " )
print ( " 正在跳转至小红书首页 " )
await asyncio . sleep ( 5 )
await context_page . reload ( )
await asyncio . sleep ( 1 )
cookies = await browser_context . cookies ( )
for cookie in cookies :
if cookie [ " name " ] == " a1 " :
global_a1 = cookie [ " value " ]
print ( " 当前浏览器 cookie 中 a1 值为: " + cookie [ " value " ] + " ,请将需要使用的 a1 设置成一样方可签名成功 " )
loginModal = context_page . query_selector ( " .reds-mask " )
if loginModal is not None :
loginModal . evaluate ( " el => el.click() " )
print ( " 登录弹窗已关闭 " )
print ( " 跳转小红书首页成功,等待调用 " )
loginModal = await context_page . query_selector ( " .reds-mask " )
if loginModal is not None :
await loginModal . evaluate ( " el => el.click() " )
print ( " 登录弹窗已关闭 " )
print ( " 跳转小红书首页成功,等待调用 " )
def setCookie ( a1 ) :
async def setCookie ( a1 : str ) - > Dict [ str , Any ] :
global browser_context , context_page # 声明全局变量
global global_a1
try :
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or context_page . is_closed ( ) :
browser_context , context_page = get_context_page ( playwright, stealth_js_path )
context_page . goto ( " https://www.xiaohongshu.com " )
time . sleep ( 5 )
context_page . reload ( )
time . sleep ( 1 )
if context_page is None or await context_page . is_closed ( ) :
browser_context , context_page = await get_context_page ( playwright_instance , " stealth.min.js " )
await context_page . goto ( " https://www.xiaohongshu.com " )
await asyncio . sleep ( 5 )
await context_page . reload ( )
await asyncio . sleep ( 1 )
if a1 != global_a1 :
# 删除 a1 cookie
browser_context . add_cookies ( [
await browser_context . add_cookies ( [
{ ' name ' : ' a1 ' , ' value ' : ' ' , ' domain ' : " .xiaohongshu.com " , ' path ' : " / " }
] )
# 等待一段时间以确保 cookie 被删除
time . sleep ( 1 )
await asyncio . sleep ( 1 )
# 设置新的 a1 cookie
browser_context . add_cookies ( [
await browser_context . add_cookies ( [
{ ' name ' : ' a1 ' , ' value ' : a1 , ' domain ' : " .xiaohongshu.com " , ' path ' : " / " }
] )
context_page . reload ( )
time . sleep ( 1 )
cookies = browser_context . cookies ( )
await context_page . reload ( )
await asyncio . sleep ( 1 )
cookies = await browser_context . cookies ( )
print ( " 页面加载后的 Cookie: " , cookies )
global_a1 = a1
print ( " 设置 cookie 成功 " , a1 )
return { " status " : " success " , " message " : " Cookie set successfully " }
except Exception as e :
print ( f " Error during setCookie operation: { e } " )
return { " error " : str ( e ) }
raise HTTPException ( status_code = 500 , detail = str ( e ) )
def sign ( uri , data , a1 , web_session ) :
async def sign ( uri : str , data : Dict [ str , Any ] , a1 : str , web_session : str ) - > Dict [ str , Any ] :
global browser_context , context_page # 声明全局变量
global global_a1
try :
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or context_page . is_closed ( ) :
browser_context , context_page = get_context_page ( playwright, stealth_js_path )
context_page . goto ( " https://www.xiaohongshu.com " )
time . sleep ( 5 )
context_page . reload ( )
time . sleep ( 1 )
if context_page is None or await context_page . is_closed ( ) :
browser_context , context_page = await get_context_page ( playwright_instance , " stealth.min.js " )
await context_page . goto ( " https://www.xiaohongshu.com " )
await asyncio . sleep ( 5 )
await context_page . reload ( )
await asyncio . sleep ( 1 )
if a1 != global_a1 :
setCookie ( a1 )
await setCookie ( a1 )
# 执行 JavaScript 函数
# localStorage.getItem("b1")
b1 = context_page . evaluate ( " () => localStorage.getItem( ' b1 ' ) " )
b1b1 = context_page . evaluate ( " () => localStorage.getItem( ' b1b1 ' ) " )
encrypt_params = context_page . evaluate ( " ([url, data]) => window._webmsxyw(url, data) " , [ uri , data ] )
b1 = await context_page . evaluate ( " () => localStorage.getItem( ' b1 ' ) " )
b1b1 = await context_page . evaluate ( " () => localStorage.getItem( ' b1b1 ' ) " )
encrypt_params = await context_page . evaluate ( " ([url, data]) => window._webmsxyw(url, data) " , [ uri , data ] )
return {
" x-s " : encrypt_params [ " X-s " ] ,
" x-t " : str ( encrypt_params [ " X-t " ] ) ,
@@ -109,38 +115,57 @@ def sign(uri, data, a1, web_session):
}
except Exception as e :
print ( f " Error during sign operation: { e } " )
return { " error " : str ( e ) }
raise HTTPException ( status_code = 500 , detail = str ( e ) )
@app.route ( " /sign " , methods = [ " POST " ] )
def hello_world ( ) :
json = request . json
uri = json [ " uri " ]
data = json [ " data " ]
a1 = json [ " a1 " ]
web_session = json [ " web_session " ]
me = sign ( uri , data , a1 , web_session )
@app.post ( " /sign " )
async def sign_endpoint ( request : Request ) :
json_data = await request . json ( )
uri = json_data . get ( " uri " )
data = json_data . get ( " data " )
a1 = json_data . get ( " a1 " )
web_session = json_data . get ( " web_session " )
if not all ( [ uri , data , a1 , web_session ] ) :
raise HTTPException ( status_code = 400 , detail = " Missing required parameters " )
me = await sign ( uri , data , a1 , web_session )
return {
" a1 " : a1 ,
" sign " : me ,
}
@app.route ( " /a1 " , methods = [ " GET " ] )
def get_a1 ( ) :
@app.get ( " /a1 " )
async def get_a1 ( a1 : Optional [ str ] = None ) :
global global_a1
# 获取 params? a1的参数
a1 = request . args . get ( ' a1 ' )
if a1 and a1 != global_a1 :
# 如果提供了 a1 参数且与当前的 global_a1 不同,则调用 setCookie 函数
# 调用 setCookie 函数
result = setCookie ( a1 )
result = await setCookie ( a1 )
return { ' a1 ' : a1 , ' result ' : result }
else :
# 如果没有提供 a1 参数,则返回当前的 global_a1 值
return { ' a1 ' : global_a1 }
# 在应用启动时初始化 Playwright
@app.on_event ( " startup " )
async def startup_event ( ) :
await initialize_playwright ( )
# 在应用关闭时清理资源
@app.on_event ( " shutdown " )
async def shutdown_event ( ) :
global playwright_instance , browser_context , context_page
if context_page :
await context_page . close ( )
if browser_context :
await browser_context . close ( )
if playwright_instance :
await playwright_instance . stop ( )
if __name__ == ' __main__ ' :
port = os . getenv ( ' XHS_API_PORT ' , 5005 )
app . run ( host = " 0.0.0.0 " , port = port )
im port uvicorn
uvicorn . run ( app , host = " 0.0.0.0 " , port = 5005 )