import json import time import random import asyncio import aiohttp from typing import List, Optional from redis.asyncio import Redis from starlette.concurrency import run_in_threadpool from app.schemas.troov import TroovRate from app.utils.france_slot_api import troov_create_session_old from app.utils.proxy_utils import load_proxies_from_json from app.core.logger import logger # ========================================================= # Redis 原子弹出 token(不使用 KEYS,避免阻塞) # ========================================================= POP_TOKEN_LUA = """ local cursor = "0" local max_ttl = -1 local max_key = nil repeat local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50) cursor = result[1] local keys = result[2] for _, key in ipairs(keys) do local ttl = redis.call('TTL', key) if ttl > max_ttl then max_ttl = ttl max_key = key end end until cursor == "0" if max_key then local value = redis.call('GET', max_key) redis.call('DEL', max_key) return {max_key, value, max_ttl} end return nil """ async def pop_redis_value_token(redis_client: Redis): """ 原子性获取 TTL 最大的 token 并删除 """ return await redis_client.eval(POP_TOKEN_LUA, 0) # ========================================================= # 请求法国 Troov 接口(async,不阻塞) # ========================================================= async def fetch_rate(session_dic: dict, date: str) -> str: url = ( "https://api.consulat.gouv.fr/api/team/" "621540d353069dec25bd0045/reservations/availability" f"?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5" f"&sessionId={session_dic['session_id']}" ) headers = { "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "origin": "https://consulat.gouv.fr", "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/141.0.0.0 Safari/537.36" ), "x-gouv-app-id": session_dic["x_gouv_app_id"], "x-gouv-web": "fr.gouv.consulat", } timeout = aiohttp.ClientTimeout(total=15) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers=headers) as resp: return await resp.text() # ========================================================= # 核心业务逻辑 # ========================================================= async def get_rate_by_date( redis_client: Redis, date: str ) -> Optional[List[TroovRate]]: """ 根据日期获取 Troov 预约信息 """ # ---------- 1️⃣ 加载代理 ---------- proxies = [] for pool in ("oxylabs",): proxies.extend( load_proxies_from_json("data/proxy_pool_config.json", pool) ) if not proxies: logger.error("Proxy pool is empty") return None # ---------- 2️⃣ 获取验证码 token(最多等待 30 秒) ---------- token_data = None for _ in range(30): token_data = await pop_redis_value_token(redis_client) if token_data: break await asyncio.sleep(1) if not token_data: logger.warning("No captcha token available") return None _, body_str, ttl = token_data try: body = json.loads(body_str) captcha_token = body.get("token") except Exception: logger.exception("Invalid captcha token format") return None # ---------- 3️⃣ 创建 Troov session(同步函数放线程池) ---------- proxy = random.choice(proxies) session_dic = await run_in_threadpool(troov_create_session_old, proxy, captcha_token) if not session_dic: logger.warning("Failed to create Troov session") return None logger.info(f"Troov session created: {session_dic}") # ---------- 4️⃣ 请求预约数据 ---------- try: response_text = await fetch_rate(session_dic, date) return json.loads(response_text) except Exception as e: logger.error(f"Fetch rate failed: {e}") return None