| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import json
- import time
- import random
- import asyncio
- import aiohttp
- from typing import List, Optional, Tuple, Dict, Any
- from redis.asyncio import Redis
- from starlette.concurrency import run_in_threadpool
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.schemas.troov import TroovRate
- from app.utils.france_slot_api import troov_create_session_old
- from app.utils.proxy_utils import load_proxies_from_json
- from app.core.logger import logger
- POP_TOKEN_LUA = """
- local cursor = "0"
- local max_ttl = -1
- local max_key = nil
- repeat
- local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
- cursor = result[1]
- local keys = result[2]
- for _, key in ipairs(keys) do
- local ttl = redis.call('TTL', key)
- if ttl > max_ttl then
- max_ttl = ttl
- max_key = key
- end
- end
- until cursor == "0"
- if max_key then
- local value = redis.call('GET', max_key)
- redis.call('DEL', max_key)
- return {max_key, value, max_ttl}
- end
- return nil
- """
- async def get_valid_token_from_redis(redis_client: Redis, timeout: int = 30) -> Optional[str]:
- """
- 尝试从 Redis 获取有效的验证码 Token。
- 包含重试机制。
- """
- start_time = time.time()
-
- while time.time() - start_time < timeout:
- # 执行 Lua 脚本原子获取
- result = await redis_client.eval(POP_TOKEN_LUA, 0)
-
- if result:
- try:
- # result 结构: [key, value_str, ttl]
- body_str = result[1]
- body = json.loads(body_str)
- token = body.get("token")
- if token:
- return token
- except (json.JSONDecodeError, IndexError, AttributeError):
- logger.warning("Redis retrieved invalid token format")
-
- # 没拿到或格式不对,稍作等待
- await asyncio.sleep(1)
-
- return None
- # =========================================================
- # 2. 网络请求模块
- # =========================================================
- async def fetch_troov_availability(
- session_data: Dict[str, Any],
- date: str,
- proxy_url: str
- ) -> str:
- """
- 请求 Troov 预约可用性接口。
- 强制使用指定的代理。
- """
- url = (
- "https://51.254.177.49/api/team/"
- "621540d353069dec25bd0045/reservations/availability"
- )
-
- # URL 参数
- params = {
- "name": "Visas",
- "date": date,
- "places": "-5",
- "matching": "",
- "maxCapacity": "-5",
- "sessionId": session_data.get("session_id")
- }
- headers = {
- "accept": "application/json, text/plain, */*",
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
- # "origin": "https://consulat.gouv.fr",
- # "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas",
- "user-agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/141.0.0.0 Safari/537.36"
- ),
- "x-gouv-app-id": session_data.get("x_gouv_app_id"),
- "x-gouv-web": "fr.gouv.consulat",
- }
- timeout = aiohttp.ClientTimeout(total=15)
-
- connector = aiohttp.TCPConnector(ssl=False)
- # 显式使用传入的 proxy_url
- async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
- async with session.get(
- url,
- params=params,
- headers=headers,
- proxy=proxy_url
- ) as resp:
- resp.raise_for_status() # 如果状态码不是 200,抛出异常
- return await resp.text()
- # =========================================================
- # 3. 核心业务流程
- # =========================================================
- def _get_proxy_pool() -> List[str]:
- """加载代理池配置"""
- proxies = []
- # 可以在此处扩展更多 pool 类型
- for pool in ("oxylabs",):
- proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool))
- return proxies
- async def get_rate_by_date(
- redis_client: Redis,
- date: str
- ) -> Optional[List[TroovRate]]:
- """
- 主入口:根据日期获取 Troov 预约信息
- 流程:获取代理 -> 获取 Token -> 创建会话(Sync) -> 获取数据(Async)
- """
- # 1. 准备代理
- proxies = _get_proxy_pool()
- if not proxies:
- raise NotFoundError(message="Proxy pool is empty")
-
- # 随机选择一个代理,并在整个流程中保持一致
- current_proxy = random.choice(proxies)
- # 2. 获取验证码 Token
- captcha_token = await get_valid_token_from_redis(redis_client)
- if not captcha_token:
- raise NotFoundError(message="Failed to retrieve captcha token within timeout")
- logger.info(f"Creating session with proxy: {current_proxy}...")
- session_dic = await run_in_threadpool(
- troov_create_session_old,
- current_proxy,
- captcha_token
- )
-
- if not session_dic:
- raise BizLogicError(message="Failed to create Troov session (session_dic is empty)")
-
- logger.info(f"Troov session created successfully: {session_dic.get('session_id')}")
- # 确保这里传入了 current_proxy
- response_text = await fetch_troov_availability(session_dic, date, current_proxy)
-
- # 解析数据
- data = json.loads(response_text)
- # 这里可以加一步数据校验,确保 data 是 List[TroovRate] 格式
- return data
|