Compare commits

...

2 Commits

Author SHA1 Message Date
4201d9c65a change to flstapi 2025-05-03 01:15:20 +08:00
e7b645d795 temp 2025-05-03 00:57:07 +08:00
7 changed files with 201 additions and 229 deletions

23
Dockerfile Normal file
View File

@ -0,0 +1,23 @@
FROM --platform=$TARGETPLATFORM mcr.microsoft.com/playwright/python:v1.38.0-jammy
LABEL authors="abearxiong"
LABEL mail="xiongxiao@xiongxiao.me"
WORKDIR /app
COPY server.py .
RUN set -ex \
&& apt-get update \
&& apt-get install -y --no-install-recommends curl
# reference -> https://playwright.dev/python/docs/ci#via-containers
RUN python -m pip install --upgrade pip \
&& pip install fastapi uvicorn xhs playwright \
&& rm -rf /var/lib/apt/lists/*ç
RUN curl --insecure -L -o stealth.min.js https://cdn.jsdelivr.net/gh/requireCool/stealth.min.js/stealth.min.js
EXPOSE 5005
CMD [ "python", "-m", "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "5005"]

171
app.py Normal file
View File

@ -0,0 +1,171 @@
import time
import asyncio
from fastapi import FastAPI, Request, HTTPException
from playwright.async_api import async_playwright # 改用异步 API
from dotenv import load_dotenv
import os
from typing import Optional, Dict, Any
# 加载环境变量
load_dotenv()
app = FastAPI()
global_a1 = ""
# 确保在模块级别声明全局变量
browser_context = None
context_page = None
playwright_instance = None
async def get_context_page(instance, stealth_js_path):
chromium = instance.chromium
browser = await chromium.launch(headless=True)
# browser = await chromium.launch(headless=False)
context = await browser.new_context()
await context.add_init_script(path=stealth_js_path)
page = await context.new_page()
return context, page
# 初始化 playwright 的异步函数
async def initialize_playwright():
global browser_context, context_page, playwright_instance, global_a1
# 如下更改为 stealth.min.js 文件路径地址
stealth_js_path = "stealth.min.js"
print("正在启动 playwright")
playwright_instance = await async_playwright().start()
browser_context, context_page = await get_context_page(playwright_instance, stealth_js_path)
await context_page.goto("https://www.xiaohongshu.com")
print("正在跳转至小红书首页")
await asyncio.sleep(5)
await context_page.reload()
await asyncio.sleep(1)
cookies = await browser_context.cookies()
for cookie in cookies:
if cookie["name"] == "a1":
global_a1 = cookie["value"]
print("当前浏览器 cookie 中 a1 值为:" + cookie["value"] + ",请将需要使用的 a1 设置成一样方可签名成功")
loginModal = await context_page.query_selector(".reds-mask")
if loginModal is not None:
await loginModal.evaluate("el => el.click()")
print("登录弹窗已关闭")
print("跳转小红书首页成功,等待调用")
async def setCookie(a1: str) -> Dict[str, Any]:
global browser_context, context_page # 声明全局变量
global global_a1
try:
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or await context_page.is_closed():
browser_context, context_page = await get_context_page(playwright_instance, "stealth.min.js")
await context_page.goto("https://www.xiaohongshu.com")
await asyncio.sleep(5)
await context_page.reload()
await asyncio.sleep(1)
if a1 != global_a1:
# 删除 a1 cookie
await browser_context.add_cookies([
{'name': 'a1', 'value': '', 'domain': ".xiaohongshu.com", 'path': "/"}
])
# 等待一段时间以确保 cookie 被删除
await asyncio.sleep(1)
# 设置新的 a1 cookie
await browser_context.add_cookies([
{'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
])
await context_page.reload()
await asyncio.sleep(1)
cookies = await browser_context.cookies()
print("页面加载后的 Cookie:", cookies)
global_a1 = a1
print("设置 cookie 成功", a1)
return {"status": "success", "message": "Cookie set successfully"}
except Exception as e:
print(f"Error during setCookie operation: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def sign(uri: str, data: Dict[str, Any], a1: str, web_session: str) -> Dict[str, Any]:
global browser_context, context_page # 声明全局变量
global global_a1
try:
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or await context_page.is_closed():
browser_context, context_page = await get_context_page(playwright_instance, "stealth.min.js")
await context_page.goto("https://www.xiaohongshu.com")
await asyncio.sleep(5)
await context_page.reload()
await asyncio.sleep(1)
if a1 != global_a1:
await setCookie(a1)
# 执行 JavaScript 函数
# localStorage.getItem("b1")
b1 = await context_page.evaluate("() => localStorage.getItem('b1')")
b1b1 = await context_page.evaluate("() => localStorage.getItem('b1b1')")
encrypt_params = await context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
return {
"x-s": encrypt_params["X-s"],
"x-t": str(encrypt_params["X-t"]),
"b1": b1,
"b1b1": b1b1,
}
except Exception as e:
print(f"Error during sign operation: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/sign")
async def sign_endpoint(request: Request):
json_data = await request.json()
uri = json_data.get("uri")
data = json_data.get("data")
a1 = json_data.get("a1")
web_session = json_data.get("web_session")
if not all([uri, data, a1, web_session]):
raise HTTPException(status_code=400, detail="Missing required parameters")
me = await sign(uri, data, a1, web_session)
return {
"a1": a1,
"sign": me,
}
@app.get("/a1")
async def get_a1(a1: Optional[str] = None):
global global_a1
if a1 and a1 != global_a1:
# 如果提供了 a1 参数且与当前的 global_a1 不同,则调用 setCookie 函数
result = await setCookie(a1)
return {'a1': a1, 'result': result}
else:
# 如果没有提供 a1 参数,则返回当前的 global_a1 值
return {'a1': global_a1}
# 在应用启动时初始化 Playwright
@app.on_event("startup")
async def startup_event():
await initialize_playwright()
# 在应用关闭时清理资源
@app.on_event("shutdown")
async def shutdown_event():
global playwright_instance, browser_context, context_page
if context_page:
await context_page.close()
if browser_context:
await browser_context.close()
if playwright_instance:
await playwright_instance.stop()
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5005)

View File

@ -1,13 +1,11 @@
const PORT = 5006; const PORT = 5006;
// python -m uvicorn app:app --host 0.0.0.0 --port 5006
module.exports = { module.exports = {
apps: [ apps: [
{ {
name: 'xhs-api-server', name: 'xhs-api-server',
script: 'server.py', // 替换为您的Python脚本路径
interpreter: 'python', // 替换为您的Python解释器路径 interpreter: 'python', // 替换为您的Python解释器路径
env: { args: ['-m uvicorn app:app --host 0.0.0.0 --port', PORT], // 传递端口参数
XHS_API_PORT: PORT, // 从环境变量获取端口不存在则使用5005
},
}, },
], ],
}; };

View File

@ -2,8 +2,7 @@
"name": "@kevisual/social-xhs-api-server", "name": "@kevisual/social-xhs-api-server",
"type": "module", "type": "module",
"scripts": { "scripts": {
"pm2": "pm2 start ecosystem.config.mjs", "pm2": "pm2 start ecosystem.config.cjs",
"start": "pm2 start --name xhs-api-server --interpreter python3 --env XHS_API_PORT=5006 -- server.py ", "py": " python -m uvicorn app:app --host 0.0.0.0 --port 5006"
"py": "XHS_API_PORT=5006 python server.py"
} }
} }

View File

@ -1,6 +1,5 @@
playwright playwright
xhs xhs
gevent fastapi
requests dotenv
flask uvicorn
dotenv

146
server.py
View File

@ -1,146 +0,0 @@
import time
from flask import Flask, request
from gevent import monkey
from playwright.sync_api import sync_playwright
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
monkey.patch_all()
app = Flask(__name__)
global_a1 = ""
# 确保在模块级别声明全局变量
browser_context = None
context_page = None
def get_context_page(instance, stealth_js_path):
chromium = instance.chromium
browser = chromium.launch(headless=True)
# browser = chromium.launch(headless=False)
context = browser.new_context()
context.add_init_script(path=stealth_js_path)
page = context.new_page()
return context, page
# 如下更改为 stealth.min.js 文件路径地址
stealth_js_path = "stealth.min.js"
print("正在启动 playwright")
playwright = sync_playwright().start()
browser_context, context_page = get_context_page(playwright, stealth_js_path)
context_page.goto("https://www.xiaohongshu.com")
print("正在跳转至小红书首页")
time.sleep(5)
context_page.reload()
time.sleep(1)
cookies = browser_context.cookies()
for cookie in cookies:
if cookie["name"] == "a1":
global_a1 = cookie["value"]
print("当前浏览器 cookie 中 a1 值为:" + cookie["value"] + ",请将需要使用的 a1 设置成一样方可签名成功")
loginModal = context_page.query_selector(".reds-mask")
if loginModal is not None:
loginModal.evaluate("el => el.click()")
print("登录弹窗已关闭")
print("跳转小红书首页成功,等待调用")
def setCookie(a1):
global browser_context, context_page # 声明全局变量
global global_a1
try:
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or context_page.is_closed():
browser_context, context_page = get_context_page(playwright, stealth_js_path)
context_page.goto("https://www.xiaohongshu.com")
time.sleep(5)
context_page.reload()
time.sleep(1)
if a1 != global_a1:
# 删除 a1 cookie
browser_context.add_cookies([
{'name': 'a1', 'value': '', 'domain': ".xiaohongshu.com", 'path': "/"}
])
# 等待一段时间以确保 cookie 被删除
time.sleep(1)
# 设置新的 a1 cookie
browser_context.add_cookies([
{'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
])
context_page.reload()
time.sleep(1)
cookies = browser_context.cookies()
print("页面加载后的 Cookie:", cookies)
global_a1 = a1
print("设置 cookie 成功", a1)
return {"status": "success", "message": "Cookie set successfully"}
except Exception as e:
print(f"Error during setCookie operation: {e}")
return {"error": str(e)}
def sign(uri, data, a1, web_session):
global browser_context, context_page # 声明全局变量
global global_a1
try:
# 确保页面仍然有效,如果页面已关闭则重新初始化
if context_page is None or context_page.is_closed():
browser_context, context_page = get_context_page(playwright, stealth_js_path)
context_page.goto("https://www.xiaohongshu.com")
time.sleep(5)
context_page.reload()
time.sleep(1)
if a1 != global_a1:
setCookie(a1)
# 执行 JavaScript 函数
# localStorage.getItem("b1")
b1 = context_page.evaluate("() => localStorage.getItem('b1')")
b1b1 = context_page.evaluate("() => localStorage.getItem('b1b1')")
encrypt_params = context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
return {
"x-s": encrypt_params["X-s"],
"x-t": str(encrypt_params["X-t"]),
"b1": b1,
"b1b1": b1b1,
}
except Exception as e:
print(f"Error during sign operation: {e}")
return {"error": str(e)}
@app.route("/sign", methods=["POST"])
def hello_world():
json = request.json
uri = json["uri"]
data = json["data"]
a1 = json["a1"]
web_session = json["web_session"]
me = sign(uri, data, a1, web_session)
return {
"a1": a1,
"sign": me,
}
@app.route("/a1", methods=["GET"])
def get_a1():
global global_a1
# 获取 paramsa1的参数
a1 = request.args.get('a1')
if a1 and a1 != global_a1:
# 如果提供了 a1 参数且与当前的 global_a1 不同,则调用 setCookie 函数
# 调用 setCookie 函数
result = setCookie(a1)
return {'a1': a1, 'result': result}
else:
# 如果没有提供 a1 参数,则返回当前的 global_a1 值
return {'a1': global_a1}
if __name__ == '__main__':
port = os.getenv('XHS_API_PORT', 5005)
app.run(host="0.0.0.0", port=port)

View File

@ -1,72 +0,0 @@
import time
from flask import Flask, request
from gevent import monkey
from playwright.sync_api import sync_playwright
monkey.patch_all()
app = Flask(__name__)
global_a1 = ""
def get_context_page(instance, stealth_js_path):
chromium = instance.chromium
browser = chromium.launch(headless=True)
context = browser.new_context()
context.add_init_script(path=stealth_js_path)
page = context.new_page()
return context, page
stealth_js_path = "stealth.min.js"
print("正在启动 playwright")
playwright = sync_playwright().start()
browser_context, context_page = get_context_page(playwright, stealth_js_path)
context_page.goto("https://www.xiaohongshu.com")
print("正在跳转至小红书首页")
time.sleep(5)
context_page.reload()
time.sleep(1)
cookies = browser_context.cookies()
for cookie in cookies:
if cookie["name"] == "a1":
global_a1 = cookie["value"]
print("当前浏览器中 a1 值为:" + global_a1 + ",请将您的 cookie 中的 a1 也设置成一样,方可签名成功")
print("跳转小红书首页成功,等待调用")
def sign(uri, data, a1, web_session):
global global_a1
if a1 != global_a1:
browser_context.add_cookies([
{'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
])
context_page.reload()
time.sleep(1)
global_a1 = a1
encrypt_params = context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
return {
"x-s": encrypt_params["X-s"],
"x-t": str(encrypt_params["X-t"])
}
@app.route("/sign", methods=["POST"])
def hello_world():
json = request.json
uri = json["uri"]
data = json["data"]
a1 = json["a1"]
web_session = json["web_session"]
return sign(uri, data, a1, web_session)
@app.route("/a1", methods=["GET"])
def get_a1():
return {'a1': global_a1}
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5005)