| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import json
- import time
- import random
- import asyncio
- import aiohttp
- from typing import List, Optional
- from redis.asyncio import Redis
- from starlette.concurrency import run_in_threadpool
- from app.schemas.troov import TroovRate
- from app.utils.france_slot_api import troov_create_session_old
- from app.utils.proxy_utils import load_proxies_from_json
- from app.core.logger import logger
- # =========================================================
- # Redis 原子弹出 token(不使用 KEYS,避免阻塞)
- # =========================================================
- POP_TOKEN_LUA = """
- local cursor = "0"
- local max_ttl = -1
- local max_key = nil
- repeat
- local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
- cursor = result[1]
- local keys = result[2]
- for _, key in ipairs(keys) do
- local ttl = redis.call('TTL', key)
- if ttl > max_ttl then
- max_ttl = ttl
- max_key = key
- end
- end
- until cursor == "0"
- if max_key then
- local value = redis.call('GET', max_key)
- redis.call('DEL', max_key)
- return {max_key, value, max_ttl}
- end
- return nil
- """
- async def pop_redis_value_token(redis_client: Redis):
- """
- 原子性获取 TTL 最大的 token 并删除
- """
- return await redis_client.eval(POP_TOKEN_LUA, 0)
- # =========================================================
- # 请求法国 Troov 接口(async,不阻塞)
- # =========================================================
- async def fetch_rate(session_dic: dict, date: str) -> str:
- url = (
- "https://api.consulat.gouv.fr/api/team/"
- "621540d353069dec25bd0045/reservations/availability"
- f"?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5"
- f"&sessionId={session_dic['session_id']}"
- )
- headers = {
- "accept": "application/json, text/plain, */*",
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
- "origin": "https://consulat.gouv.fr",
- "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas",
- "user-agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/141.0.0.0 Safari/537.36"
- ),
- "x-gouv-app-id": session_dic["x_gouv_app_id"],
- "x-gouv-web": "fr.gouv.consulat",
- }
- timeout = aiohttp.ClientTimeout(total=15)
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.get(url, headers=headers) as resp:
- return await resp.text()
- # =========================================================
- # 核心业务逻辑
- # =========================================================
- async def get_rate_by_date(
- redis_client: Redis,
- date: str
- ) -> Optional[List[TroovRate]]:
- """
- 根据日期获取 Troov 预约信息
- """
- # ---------- 1️⃣ 加载代理 ----------
- proxies = []
- for pool in ("oxylabs",):
- proxies.extend(
- load_proxies_from_json("data/proxy_pool_config.json", pool)
- )
- if not proxies:
- logger.error("Proxy pool is empty")
- return None
- # ---------- 2️⃣ 获取验证码 token(最多等待 30 秒) ----------
- token_data = None
- for _ in range(30):
- token_data = await pop_redis_value_token(redis_client)
- if token_data:
- break
- await asyncio.sleep(1)
- if not token_data:
- logger.warning("No captcha token available")
- return None
- _, body_str, ttl = token_data
- try:
- body = json.loads(body_str)
- captcha_token = body.get("token")
- except Exception:
- logger.exception("Invalid captcha token format")
- return None
- # ---------- 3️⃣ 创建 Troov session(同步函数放线程池) ----------
- proxy = random.choice(proxies)
- session_dic = await run_in_threadpool(troov_create_session_old, proxy, captcha_token)
- if not session_dic:
- logger.warning("Failed to create Troov session")
- return None
- logger.info(f"Troov session created: {session_dic}")
- # ---------- 4️⃣ 请求预约数据 ----------
- try:
- response_text = await fetch_rate(session_dic, date)
- return json.loads(response_text)
- except Exception as e:
- logger.error(f"Fetch rate failed: {e}")
- return None
|