This commit is contained in:
2025-12-25 11:48:21 +08:00
parent 3c6c9f9fbf
commit 145dff7a7e
6 changed files with 488 additions and 42 deletions

57
app.py
View File

@@ -6,6 +6,7 @@ from dotenv import load_dotenv
import os import os
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from playwright_sign import sign_with_playwright
# 加载环境变量 # 加载环境变量
load_dotenv() load_dotenv()
@@ -26,7 +27,6 @@ browser_context = None
context_page = None context_page = None
playwright_instance = None playwright_instance = None
async def get_context_page(instance, stealth_js_path): async def get_context_page(instance, stealth_js_path):
chromium = instance.chromium chromium = instance.chromium
browser = await chromium.launch(headless=True) browser = await chromium.launch(headless=True)
@@ -125,7 +125,7 @@ async def reload_browser():
except Exception as e: except Exception as e:
print(f"Error during reload_browser operation: {e}") print(f"Error during reload_browser operation: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
async def sign(uri: str, data: Dict[str, Any], a1: str, web_session: str) -> Dict[str, Any]: async def sign(uri: str, data: Optional[Dict[str, Any]], a1: str, method: str = "POST") -> Dict[str, Any]:
global browser_context, context_page # 声明全局变量 global browser_context, context_page # 声明全局变量
global global_a1 global global_a1
try: try:
@@ -137,41 +137,26 @@ async def sign(uri: str, data: Dict[str, Any], a1: str, web_session: str) -> Dic
await context_page.reload() await context_page.reload()
await asyncio.sleep(1) await asyncio.sleep(1)
# 执行 JavaScript 函数 # 使用 playwright_sign 模块的 sign_with_playwright 函数
b1 = await context_page.evaluate("() => localStorage.getItem('b1')") signs = await sign_with_playwright(context_page, uri, data, a1, method)
b1b1 = await context_page.evaluate("() => localStorage.getItem('b1b1')")
encrypt_params = await context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
if not encrypt_params or not isinstance(encrypt_params, dict):
raise HTTPException(status_code=500, detail="Failed to retrieve encryption parameters")
return { return {
"x-s": encrypt_params["X-s"], "x-s": signs["x-s"],
"x-t": str(encrypt_params["X-t"]), "x-t": signs["x-t"],
"b1": b1, "x-s-common": signs["x-s-common"],
"x-b3-traceid": signs["x-b3-traceid"],
"a1": global_a1, "a1": global_a1,
"b1b1": b1b1,
} }
except Exception as e: except Exception as e:
print(f"Error during sign operation: {e}")
# 检测页面崩溃错误并重新初始化 # 检测页面崩溃错误并重新初始化
if "Target crashed" in str(e): try:
print("页面崩溃,正在重新初始化浏览器上下文和页面...") await reload_browser()
try: # 重试签名操作
await reload_browser() return await sign(uri, data, a1, method)
# 重试签名操作 except Exception as reinit_error:
return await sign(uri, data, a1, web_session) print(f"重新初始化失败: {reinit_error}")
except Exception as reinit_error: raise HTTPException(status_code=500, detail="Failed to recover from error")
print(f"重新初始化失败: {reinit_error}")
raise HTTPException(status_code=500, detail="Failed to recover from page crash")
else:
print(f"Error during sign operation: {e}")
try:
await reload_browser()
# 重试签名操作
return await sign(uri, data, a1, web_session)
except Exception as reinit_error:
print(f"重新初始化失败: {reinit_error}")
raise HTTPException(status_code=500, detail="Failed to recover from page crash")
@app.post("/sign") @app.post("/sign")
@@ -180,10 +165,10 @@ async def sign_endpoint(request: Request):
uri = json_data.get("uri") uri = json_data.get("uri")
data = json_data.get("data") data = json_data.get("data")
a1 = json_data.get("a1") a1 = json_data.get("a1")
web_session = json_data.get("web_session") method = json_data.get("method", "POST")
if not uri or not a1: if not uri or not a1:
raise HTTPException(status_code=400, detail="Missing required parameters") raise HTTPException(status_code=400, detail="Missing required parameters")
me = await sign(uri, data, a1, web_session) me = await sign(uri, data, a1, method)
return { return {
"a1": a1, "a1": a1,
"sign": me, "sign": me,

35
main.py Normal file
View File

@@ -0,0 +1,35 @@
import asyncio
from playwright.async_api import async_playwright
from playwright_sign import sign_with_playwright
async def main():
"""测试签名生成功能"""
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
# 访问小红书页面以加载必要的 JavaScript 环境
await page.goto('https://www.xiaohongshu.com')
# 等待页面加载
await page.wait_for_load_state('networkidle')
# 测试签名生成
uri = '/api/sns/web/v1/search/notes'
data = {'keyword': 'test', 'page': 1}
a1_value = 'example_a1_cookie_value'
method = 'POST'
signs = await sign_with_playwright(page, uri, data, a1_value, method)
print(f"Generated signs: {signs}")
# 关闭浏览器
await browser.close()
if __name__ == '__main__':
asyncio.run(main())

244
playwright_sign.py Normal file
View File

@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/media_platform/xhs/playwright_sign.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# 通过 Playwright 注入调用 window.mnsv2 生成小红书签名
import hashlib
import json
import time
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse, quote
from playwright.async_api import Page
from xhs_sign import b64_encode, encode_utf8, get_trace_id, mrc
def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method: str = "POST") -> str:
"""构建待签名字符串
Args:
uri: API路径
data: 请求数据
method: 请求方法 (GET 或 POST)
Returns:
待签名字符串
"""
if method.upper() == "POST":
# POST 请求使用 JSON 格式
c = uri
if data is not None:
if isinstance(data, dict):
c += json.dumps(data, separators=(",", ":"), ensure_ascii=False)
elif isinstance(data, str):
c += data
return c
else:
# GET 请求使用查询字符串格式
if not data or (isinstance(data, dict) and len(data) == 0):
return uri
if isinstance(data, dict):
params = []
for key in data.keys():
value = data[key]
if isinstance(value, list):
value_str = ",".join(str(v) for v in value)
elif value is not None:
value_str = str(value)
else:
value_str = ""
# 使用URL编码safe参数保留某些字符不编码
# 注意httpx会对逗号、等号等字符进行编码我们也需要同样处理
value_str = quote(value_str, safe='')
params.append(f"{key}={value_str}")
return f"{uri}?{'&'.join(params)}"
elif isinstance(data, str):
return f"{uri}?{data}"
return uri
def _md5_hex(s: str) -> str:
"""计算 MD5 哈希值"""
return hashlib.md5(s.encode("utf-8")).hexdigest()
def _build_xs_payload(x3_value: str, data_type: str = "object") -> str:
"""构建 x-s 签名"""
s = {
"x0": "4.2.1",
"x1": "xhs-pc-web",
"x2": "Mac OS",
"x3": x3_value,
"x4": data_type,
}
return "XYS_" + b64_encode(encode_utf8(json.dumps(s, separators=(",", ":"))))
def _build_xs_common(a1: str, b1: str, x_s: str, x_t: str) -> str:
"""构建 x-s-common 请求头"""
payload = {
"s0": 3,
"s1": "",
"x0": "1",
"x1": "4.2.2",
"x2": "Mac OS",
"x3": "xhs-pc-web",
"x4": "4.74.0",
"x5": a1,
"x6": x_t,
"x7": x_s,
"x8": b1,
"x9": mrc(x_t + x_s + b1),
"x10": 154,
"x11": "normal",
}
return b64_encode(encode_utf8(json.dumps(payload, separators=(",", ":"))))
async def get_b1_from_localstorage(page: Page) -> str:
"""从 localStorage 获取 b1 值"""
try:
local_storage = await page.evaluate("() => window.localStorage")
return local_storage.get("b1", "")
except Exception:
return ""
async def call_mnsv2(page: Page, sign_str: str, md5_str: str) -> str:
"""
通过 playwright 调用 window.mnsv2 函数
Args:
page: playwright Page 对象
sign_str: 待签名字符串 (uri + JSON.stringify(data))
md5_str: sign_str 的 MD5 哈希值
Returns:
mnsv2 返回的签名字符串
"""
sign_str_escaped = sign_str.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n")
md5_str_escaped = md5_str.replace("\\", "\\\\").replace("'", "\\'")
try:
result = await page.evaluate(f"window.mnsv2('{sign_str_escaped}', '{md5_str_escaped}')")
return result if result else ""
except Exception:
return ""
async def sign_xs_with_playwright(
page: Page,
uri: str,
data: Optional[Union[Dict, str]] = None,
method: str = "POST",
) -> str:
"""
通过 playwright 注入生成 x-s 签名
Args:
page: playwright Page 对象(必须已打开小红书页面)
uri: API 路径,如 "/api/sns/web/v1/search/notes"
data: 请求数据GET 的 params 或 POST 的 payload
method: 请求方法 (GET 或 POST)
Returns:
x-s 签名字符串
"""
sign_str = _build_sign_string(uri, data, method)
md5_str = _md5_hex(sign_str)
x3_value = await call_mnsv2(page, sign_str, md5_str)
data_type = "object" if isinstance(data, (dict, list)) else "string"
return _build_xs_payload(x3_value, data_type)
async def sign_with_playwright(
page: Page,
uri: str,
data: Optional[Union[Dict, str]] = None,
a1: str = "",
method: str = "POST",
) -> Dict[str, Any]:
"""
通过 playwright 生成完整的签名请求头
Args:
page: playwright Page 对象(必须已打开小红书页面)
uri: API 路径
data: 请求数据
a1: cookie 中的 a1 值
method: 请求方法 (GET 或 POST)
Returns:
包含 x-s, x-t, x-s-common, x-b3-traceid 的字典
"""
b1 = await get_b1_from_localstorage(page)
x_s = await sign_xs_with_playwright(page, uri, data, method)
x_t = str(int(time.time() * 1000))
return {
"x-s": x_s,
"x-t": x_t,
"x-s-common": _build_xs_common(a1, b1, x_s, x_t),
"x-b3-traceid": get_trace_id(),
}
async def pre_headers_with_playwright(
page: Page,
url: str,
cookie_dict: Dict[str, str],
params: Optional[Dict] = None,
payload: Optional[Dict] = None,
) -> Dict[str, str]:
"""
使用 playwright 注入方式生成请求头签名
可直接替换 client.py 中的 _pre_headers 方法
Args:
page: playwright Page 对象
url: 请求 URL
cookie_dict: cookie 字典
params: GET 请求参数
payload: POST 请求参数
Returns:
签名后的请求头字典
"""
a1_value = cookie_dict.get("a1", "")
uri = urlparse(url).path
# 确定请求数据和方法
if params is not None:
data = params
method = "GET"
elif payload is not None:
data = payload
method = "POST"
else:
raise ValueError("params or payload is required")
signs = await sign_with_playwright(page, uri, data, a1_value, method)
return {
"X-S": signs["x-s"],
"X-T": signs["x-t"],
"x-S-Common": signs["x-s-common"],
"X-B3-Traceid": signs["x-b3-traceid"],
}

30
readme.md Normal file
View File

@@ -0,0 +1,30 @@
# 初始化 uv
```sh
# 创建虚拟环境
uv venv
# 激活虚拟环境
source .venv/bin/activate # Linux/Mac
# 或
.venv\Scripts\activate # Windows
# 安装依赖
uv pip install -r requirements.txt
```
# 安装 Playwright 浏览器
```sh
playwright install chromium
```
# 运行项目
```sh
# 开发环境
uvicorn app:app --reload
# 生产环境
pm2 start ecosystem.config.cjs
```

12
stealth.min.js vendored

File diff suppressed because one or more lines are too long

152
xhs_sign.py Normal file
View File

@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/media_platform/xhs/xhs_sign.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# 小红书签名算法核心函数
# 用于 playwright 注入方式生成签名
import ctypes
import random
from urllib.parse import quote
# 自定义 Base64 字符表
# 标准 Base64: ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/
# 小红书打乱顺序用于混淆
BASE64_CHARS = list("ZmserbBoHQtNP+wOcza/LpngG8yJq42KWYj0DSfdikx3VT16IlUAFM97hECvuRX5")
# CRC32 查表
CRC32_TABLE = [
0, 1996959894, 3993919788, 2567524794, 124634137, 1886057615, 3915621685,
2657392035, 249268274, 2044508324, 3772115230, 2547177864, 162941995,
2125561021, 3887607047, 2428444049, 498536548, 1789927666, 4089016648,
2227061214, 450548861, 1843258603, 4107580753, 2211677639, 325883990,
1684777152, 4251122042, 2321926636, 335633487, 1661365465, 4195302755,
2366115317, 997073096, 1281953886, 3579855332, 2724688242, 1006888145,
1258607687, 3524101629, 2768942443, 901097722, 1119000684, 3686517206,
2898065728, 853044451, 1172266101, 3705015759, 2882616665, 651767980,
1373503546, 3369554304, 3218104598, 565507253, 1454621731, 3485111705,
3099436303, 671266974, 1594198024, 3322730930, 2970347812, 795835527,
1483230225, 3244367275, 3060149565, 1994146192, 31158534, 2563907772,
4023717930, 1907459465, 112637215, 2680153253, 3904427059, 2013776290,
251722036, 2517215374, 3775830040, 2137656763, 141376813, 2439277719,
3865271297, 1802195444, 476864866, 2238001368, 4066508878, 1812370925,
453092731, 2181625025, 4111451223, 1706088902, 314042704, 2344532202,
4240017532, 1658658271, 366619977, 2362670323, 4224994405, 1303535960,
984961486, 2747007092, 3569037538, 1256170817, 1037604311, 2765210733,
3554079995, 1131014506, 879679996, 2909243462, 3663771856, 1141124467,
855842277, 2852801631, 3708648649, 1342533948, 654459306, 3188396048,
3373015174, 1466479909, 544179635, 3110523913, 3462522015, 1591671054,
702138776, 2966460450, 3352799412, 1504918807, 783551873, 3082640443,
3233442989, 3988292384, 2596254646, 62317068, 1957810842, 3939845945,
2647816111, 81470997, 1943803523, 3814918930, 2489596804, 225274430,
2053790376, 3826175755, 2466906013, 167816743, 2097651377, 4027552580,
2265490386, 503444072, 1762050814, 4150417245, 2154129355, 426522225,
1852507879, 4275313526, 2312317920, 282753626, 1742555852, 4189708143,
2394877945, 397917763, 1622183637, 3604390888, 2714866558, 953729732,
1340076626, 3518719985, 2797360999, 1068828381, 1219638859, 3624741850,
2936675148, 906185462, 1090812512, 3747672003, 2825379669, 829329135,
1181335161, 3412177804, 3160834842, 628085408, 1382605366, 3423369109,
3138078467, 570562233, 1426400815, 3317316542, 2998733608, 733239954,
1555261956, 3268935591, 3050360625, 752459403, 1541320221, 2607071920,
3965973030, 1969922972, 40735498, 2617837225, 3943577151, 1913087877,
83908371, 2512341634, 3803740692, 2075208622, 213261112, 2463272603,
3855990285, 2094854071, 198958881, 2262029012, 4057260610, 1759359992,
534414190, 2176718541, 4139329115, 1873836001, 414664567, 2282248934,
4279200368, 1711684554, 285281116, 2405801727, 4167216745, 1634467795,
376229701, 2685067896, 3608007406, 1308918612, 956543938, 2808555105,
3495958263, 1231636301, 1047427035, 2932959818, 3654703836, 1088359270,
936918000, 2847714899, 3736837829, 1202900863, 817233897, 3183342108,
3401237130, 1404277552, 615818150, 3134207493, 3453421203, 1423857449,
601450431, 3009837614, 3294710456, 1567103746, 711928724, 3020668471,
3272380065, 1510334235, 755167117,
]
def _right_shift_unsigned(num: int, bit: int = 0) -> int:
"""JavaScript 无符号右移 (>>>) 的 Python 实现"""
val = ctypes.c_uint32(num).value >> bit
MAX32INT = 4294967295
return (val + (MAX32INT + 1)) % (2 * (MAX32INT + 1)) - MAX32INT - 1
def mrc(e: str) -> int:
"""CRC32 变体,用于 x-s-common 的 x9 字段"""
o = -1
for n in range(min(57, len(e))):
o = CRC32_TABLE[(o & 255) ^ ord(e[n])] ^ _right_shift_unsigned(o, 8)
return o ^ -1 ^ 3988292384
def _triplet_to_base64(e: int) -> str:
"""将 24 位整数转换为 4 个 Base64 字符"""
return (
BASE64_CHARS[(e >> 18) & 63]
+ BASE64_CHARS[(e >> 12) & 63]
+ BASE64_CHARS[(e >> 6) & 63]
+ BASE64_CHARS[e & 63]
)
def _encode_chunk(data: list, start: int, end: int) -> str:
"""编码数据块"""
result = []
for i in range(start, end, 3):
c = ((data[i] << 16) & 0xFF0000) + ((data[i + 1] << 8) & 0xFF00) + (data[i + 2] & 0xFF)
result.append(_triplet_to_base64(c))
return "".join(result)
def encode_utf8(s: str) -> list:
"""将字符串编码为 UTF-8 字节列表"""
encoded = quote(s, safe="~()*!.'")
result = []
i = 0
while i < len(encoded):
if encoded[i] == "%":
result.append(int(encoded[i + 1: i + 3], 16))
i += 3
else:
result.append(ord(encoded[i]))
i += 1
return result
def b64_encode(data: list) -> str:
"""自定义 Base64 编码"""
length = len(data)
remainder = length % 3
chunks = []
main_length = length - remainder
for i in range(0, main_length, 16383):
chunks.append(_encode_chunk(data, i, min(i + 16383, main_length)))
if remainder == 1:
a = data[length - 1]
chunks.append(BASE64_CHARS[a >> 2] + BASE64_CHARS[(a << 4) & 63] + "==")
elif remainder == 2:
a = (data[length - 2] << 8) + data[length - 1]
chunks.append(
BASE64_CHARS[a >> 10] + BASE64_CHARS[(a >> 4) & 63] + BASE64_CHARS[(a << 2) & 63] + "="
)
return "".join(chunks)
def get_trace_id() -> str:
"""生成链路追踪 trace id"""
return "".join(random.choice("abcdef0123456789") for _ in range(16))