troov_service.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import json
  2. import time
  3. import random
  4. import asyncio
  5. import aiohttp
  6. from typing import List, Optional
  7. from redis.asyncio import Redis
  8. from starlette.concurrency import run_in_threadpool
  9. from app.schemas.troov import TroovRate
  10. from app.utils.france_slot_api import troov_create_session_old
  11. from app.utils.proxy_utils import load_proxies_from_json
  12. from app.core.logger import logger
  13. # =========================================================
  14. # Redis 原子弹出 token(不使用 KEYS,避免阻塞)
  15. # =========================================================
  16. POP_TOKEN_LUA = """
  17. local cursor = "0"
  18. local max_ttl = -1
  19. local max_key = nil
  20. repeat
  21. local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
  22. cursor = result[1]
  23. local keys = result[2]
  24. for _, key in ipairs(keys) do
  25. local ttl = redis.call('TTL', key)
  26. if ttl > max_ttl then
  27. max_ttl = ttl
  28. max_key = key
  29. end
  30. end
  31. until cursor == "0"
  32. if max_key then
  33. local value = redis.call('GET', max_key)
  34. redis.call('DEL', max_key)
  35. return {max_key, value, max_ttl}
  36. end
  37. return nil
  38. """
  39. async def pop_redis_value_token(redis_client: Redis):
  40. """
  41. 原子性获取 TTL 最大的 token 并删除
  42. """
  43. return await redis_client.eval(POP_TOKEN_LUA, 0)
  44. # =========================================================
  45. # 请求法国 Troov 接口(async,不阻塞)
  46. # =========================================================
  47. async def fetch_rate(session_dic: dict, date: str) -> str:
  48. url = (
  49. "https://api.consulat.gouv.fr/api/team/"
  50. "621540d353069dec25bd0045/reservations/availability"
  51. f"?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5"
  52. f"&sessionId={session_dic['session_id']}"
  53. )
  54. headers = {
  55. "accept": "application/json, text/plain, */*",
  56. "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
  57. "origin": "https://consulat.gouv.fr",
  58. "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas",
  59. "user-agent": (
  60. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  61. "AppleWebKit/537.36 (KHTML, like Gecko) "
  62. "Chrome/141.0.0.0 Safari/537.36"
  63. ),
  64. "x-gouv-app-id": session_dic["x_gouv_app_id"],
  65. "x-gouv-web": "fr.gouv.consulat",
  66. }
  67. timeout = aiohttp.ClientTimeout(total=15)
  68. async with aiohttp.ClientSession(timeout=timeout) as session:
  69. async with session.get(url, headers=headers) as resp:
  70. return await resp.text()
  71. # =========================================================
  72. # 核心业务逻辑
  73. # =========================================================
  74. async def get_rate_by_date(
  75. redis_client: Redis,
  76. date: str
  77. ) -> Optional[List[TroovRate]]:
  78. """
  79. 根据日期获取 Troov 预约信息
  80. """
  81. # ---------- 1️⃣ 加载代理 ----------
  82. proxies = []
  83. for pool in ("oxylabs",):
  84. proxies.extend(
  85. load_proxies_from_json("data/proxy_pool_config.json", pool)
  86. )
  87. if not proxies:
  88. logger.error("Proxy pool is empty")
  89. return None
  90. # ---------- 2️⃣ 获取验证码 token(最多等待 30 秒) ----------
  91. token_data = None
  92. for _ in range(30):
  93. token_data = await pop_redis_value_token(redis_client)
  94. if token_data:
  95. break
  96. await asyncio.sleep(1)
  97. if not token_data:
  98. logger.warning("No captcha token available")
  99. return None
  100. _, body_str, ttl = token_data
  101. try:
  102. body = json.loads(body_str)
  103. captcha_token = body.get("token")
  104. except Exception:
  105. logger.exception("Invalid captcha token format")
  106. return None
  107. # ---------- 3️⃣ 创建 Troov session(同步函数放线程池) ----------
  108. proxy = random.choice(proxies)
  109. session_dic = await run_in_threadpool(troov_create_session_old, proxy, captcha_token)
  110. if not session_dic:
  111. logger.warning("Failed to create Troov session")
  112. return None
  113. logger.info(f"Troov session created: {session_dic}")
  114. # ---------- 4️⃣ 请求预约数据 ----------
  115. try:
  116. response_text = await fetch_rate(session_dic, date)
  117. return json.loads(response_text)
  118. except Exception as e:
  119. logger.error(f"Fetch rate failed: {e}")
  120. return None