import json import time import random import asyncio import aiohttp from typing import List, Optional, Tuple, Dict, Any from redis.asyncio import Redis from starlette.concurrency import run_in_threadpool from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.schemas.troov import TroovRate from app.utils.france_slot_api import troov_create_session_old from app.utils.proxy_utils import load_proxies_from_json from app.core.logger import logger POP_TOKEN_LUA = """ local cursor = "0" local max_ttl = -1 local max_key = nil repeat local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50) cursor = result[1] local keys = result[2] for _, key in ipairs(keys) do local ttl = redis.call('TTL', key) if ttl > max_ttl then max_ttl = ttl max_key = key end end until cursor == "0" if max_key then local value = redis.call('GET', max_key) redis.call('DEL', max_key) return {max_key, value, max_ttl} end return nil """ async def get_valid_token_from_redis(redis_client: Redis, timeout: int = 30) -> Optional[str]: """ 尝试从 Redis 获取有效的验证码 Token。 包含重试机制。 """ start_time = time.time() while time.time() - start_time < timeout: # 执行 Lua 脚本原子获取 result = await redis_client.eval(POP_TOKEN_LUA, 0) if result: try: # result 结构: [key, value_str, ttl] body_str = result[1] body = json.loads(body_str) token = body.get("token") if token: return token except (json.JSONDecodeError, IndexError, AttributeError): logger.warning("Redis retrieved invalid token format") # 没拿到或格式不对,稍作等待 await asyncio.sleep(1) return None # ========================================================= # 2. 网络请求模块 # ========================================================= async def fetch_troov_availability( session_data: Dict[str, Any], date: str, proxy_url: str ) -> str: """ 请求 Troov 预约可用性接口。 强制使用指定的代理。 """ url = ( "https://51.254.177.49/api/team/" "621540d353069dec25bd0045/reservations/availability" ) # URL 参数 params = { "name": "Visas", "date": date, "places": "-5", "matching": "", "maxCapacity": "-5", "sessionId": session_data.get("session_id") } headers = { "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", # "origin": "https://consulat.gouv.fr", # "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/141.0.0.0 Safari/537.36" ), "x-gouv-app-id": session_data.get("x_gouv_app_id"), "x-gouv-web": "fr.gouv.consulat", } timeout = aiohttp.ClientTimeout(total=15) connector = aiohttp.TCPConnector(ssl=False) # 显式使用传入的 proxy_url async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: async with session.get( url, params=params, headers=headers, proxy=proxy_url ) as resp: resp.raise_for_status() # 如果状态码不是 200,抛出异常 return await resp.text() # ========================================================= # 3. 核心业务流程 # ========================================================= def _get_proxy_pool() -> List[str]: """加载代理池配置""" proxies = [] # 可以在此处扩展更多 pool 类型 for pool in ("oxylabs",): proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool)) return proxies async def get_rate_by_date( redis_client: Redis, date: str ) -> Optional[List[TroovRate]]: """ 主入口:根据日期获取 Troov 预约信息 流程:获取代理 -> 获取 Token -> 创建会话(Sync) -> 获取数据(Async) """ # 1. 准备代理 proxies = _get_proxy_pool() if not proxies: raise NotFoundError(message="Proxy pool is empty") # 随机选择一个代理,并在整个流程中保持一致 current_proxy = random.choice(proxies) # 2. 获取验证码 Token captcha_token = await get_valid_token_from_redis(redis_client) if not captcha_token: raise NotFoundError(message="Failed to retrieve captcha token within timeout") logger.info(f"Creating session with proxy: {current_proxy}...") session_dic = await run_in_threadpool( troov_create_session_old, current_proxy, captcha_token ) if not session_dic: raise BizLogicError(message="Failed to create Troov session (session_dic is empty)") logger.info(f"Troov session created successfully: {session_dic.get('session_id')}") # 确保这里传入了 current_proxy response_text = await fetch_troov_availability(session_dic, date, current_proxy) # 解析数据 data = json.loads(response_text) # 这里可以加一步数据校验,确保 data 是 List[TroovRate] 格式 return data