import time from flask import Flask, request from gevent import monkey from playwright.sync_api import sync_playwright from dotenv import load_dotenv import os # 加载环境变量 load_dotenv() monkey.patch_all() app = Flask(__name__) global_a1 = "" # 确保在模块级别声明全局变量 browser_context = None context_page = None def get_context_page(instance, stealth_js_path): chromium = instance.chromium browser = chromium.launch(headless=True) # browser = chromium.launch(headless=False) context = browser.new_context() context.add_init_script(path=stealth_js_path) page = context.new_page() return context, page # 如下更改为 stealth.min.js 文件路径地址 stealth_js_path = "stealth.min.js" print("正在启动 playwright") playwright = sync_playwright().start() browser_context, context_page = get_context_page(playwright, stealth_js_path) context_page.goto("https://www.xiaohongshu.com") print("正在跳转至小红书首页") time.sleep(5) context_page.reload() time.sleep(1) cookies = browser_context.cookies() for cookie in cookies: if cookie["name"] == "a1": global_a1 = cookie["value"] print("当前浏览器 cookie 中 a1 值为:" + cookie["value"] + ",请将需要使用的 a1 设置成一样方可签名成功") loginModal = context_page.query_selector(".reds-mask") if loginModal is not None: loginModal.evaluate("el => el.click()") print("登录弹窗已关闭") print("跳转小红书首页成功,等待调用") def setCookie(a1): global browser_context, context_page # 声明全局变量 global global_a1 try: # 确保页面仍然有效,如果页面已关闭则重新初始化 if context_page is None or context_page.is_closed(): browser_context, context_page = get_context_page(playwright, stealth_js_path) context_page.goto("https://www.xiaohongshu.com") time.sleep(5) context_page.reload() time.sleep(1) if a1 != global_a1: # 删除 a1 cookie browser_context.add_cookies([ {'name': 'a1', 'value': '', 'domain': ".xiaohongshu.com", 'path': "/"} ]) # 等待一段时间以确保 cookie 被删除 time.sleep(1) # 设置新的 a1 cookie browser_context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"} ]) context_page.reload() time.sleep(1) cookies = browser_context.cookies() print("页面加载后的 Cookie:", cookies) global_a1 = a1 print("设置 cookie 成功", a1) return {"status": "success", "message": "Cookie set successfully"} except Exception as e: print(f"Error during setCookie operation: {e}") return {"error": str(e)} def sign(uri, data, a1, web_session): global browser_context, context_page # 声明全局变量 global global_a1 try: # 确保页面仍然有效,如果页面已关闭则重新初始化 if context_page is None or context_page.is_closed(): browser_context, context_page = get_context_page(playwright, stealth_js_path) context_page.goto("https://www.xiaohongshu.com") time.sleep(5) context_page.reload() time.sleep(1) if a1 != global_a1: setCookie(a1) # 执行 JavaScript 函数 # localStorage.getItem("b1") b1 = context_page.evaluate("() => localStorage.getItem('b1')") b1b1 = context_page.evaluate("() => localStorage.getItem('b1b1')") encrypt_params = context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data]) return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]), "b1": b1, "b1b1": b1b1, } except Exception as e: print(f"Error during sign operation: {e}") return {"error": str(e)} @app.route("/sign", methods=["POST"]) def hello_world(): json = request.json uri = json["uri"] data = json["data"] a1 = json["a1"] web_session = json["web_session"] me = sign(uri, data, a1, web_session) return { "a1": a1, "sign": me, } @app.route("/a1", methods=["GET"]) def get_a1(): global global_a1 # 获取 params?a1的参数 a1 = request.args.get('a1') if a1 and a1 != global_a1: # 如果提供了 a1 参数且与当前的 global_a1 不同,则调用 setCookie 函数 # 调用 setCookie 函数 result = setCookie(a1) return {'a1': a1, 'result': result} else: # 如果没有提供 a1 参数,则返回当前的 global_a1 值 return {'a1': global_a1} if __name__ == '__main__': port = os.getenv('XHS_API_PORT', 5005) app.run(host="0.0.0.0", port=port)