troov_service.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import json
  2. import time
  3. import random
  4. import asyncio
  5. import aiohttp
  6. from typing import List, Optional, Tuple, Dict, Any
  7. from redis.asyncio import Redis
  8. from starlette.concurrency import run_in_threadpool
  9. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  10. from app.schemas.troov import TroovRate
  11. from app.utils.france_slot_api import troov_create_session_old
  12. from app.utils.proxy_utils import load_proxies_from_json
  13. from app.core.logger import logger
  14. POP_TOKEN_LUA = """
  15. local cursor = "0"
  16. local max_ttl = -1
  17. local max_key = nil
  18. repeat
  19. local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
  20. cursor = result[1]
  21. local keys = result[2]
  22. for _, key in ipairs(keys) do
  23. local ttl = redis.call('TTL', key)
  24. if ttl > max_ttl then
  25. max_ttl = ttl
  26. max_key = key
  27. end
  28. end
  29. until cursor == "0"
  30. if max_key then
  31. local value = redis.call('GET', max_key)
  32. redis.call('DEL', max_key)
  33. return {max_key, value, max_ttl}
  34. end
  35. return nil
  36. """
  37. async def get_valid_token_from_redis(redis_client: Redis, timeout: int = 30) -> Optional[str]:
  38. """
  39. 尝试从 Redis 获取有效的验证码 Token。
  40. 包含重试机制。
  41. """
  42. start_time = time.time()
  43. while time.time() - start_time < timeout:
  44. # 执行 Lua 脚本原子获取
  45. result = await redis_client.eval(POP_TOKEN_LUA, 0)
  46. if result:
  47. try:
  48. # result 结构: [key, value_str, ttl]
  49. body_str = result[1]
  50. body = json.loads(body_str)
  51. token = body.get("token")
  52. if token:
  53. return token
  54. except (json.JSONDecodeError, IndexError, AttributeError):
  55. logger.warning("Redis retrieved invalid token format")
  56. # 没拿到或格式不对,稍作等待
  57. await asyncio.sleep(1)
  58. return None
  59. # =========================================================
  60. # 2. 网络请求模块
  61. # =========================================================
  62. async def fetch_troov_availability(
  63. session_data: Dict[str, Any],
  64. date: str,
  65. proxy_url: str
  66. ) -> str:
  67. """
  68. 请求 Troov 预约可用性接口。
  69. 强制使用指定的代理。
  70. """
  71. url = (
  72. "https://51.254.177.49/api/team/"
  73. "621540d353069dec25bd0045/reservations/availability"
  74. )
  75. # URL 参数
  76. params = {
  77. "name": "Visas",
  78. "date": date,
  79. "places": "-5",
  80. "matching": "",
  81. "maxCapacity": "-5",
  82. "sessionId": session_data.get("session_id")
  83. }
  84. headers = {
  85. "accept": "application/json, text/plain, */*",
  86. "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
  87. # "origin": "https://consulat.gouv.fr",
  88. # "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas",
  89. "user-agent": (
  90. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  91. "AppleWebKit/537.36 (KHTML, like Gecko) "
  92. "Chrome/141.0.0.0 Safari/537.36"
  93. ),
  94. "x-gouv-app-id": session_data.get("x_gouv_app_id"),
  95. "x-gouv-web": "fr.gouv.consulat",
  96. }
  97. timeout = aiohttp.ClientTimeout(total=15)
  98. connector = aiohttp.TCPConnector(ssl=False)
  99. # 显式使用传入的 proxy_url
  100. async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
  101. async with session.get(
  102. url,
  103. params=params,
  104. headers=headers,
  105. proxy=proxy_url
  106. ) as resp:
  107. resp.raise_for_status() # 如果状态码不是 200,抛出异常
  108. return await resp.text()
  109. # =========================================================
  110. # 3. 核心业务流程
  111. # =========================================================
  112. def _get_proxy_pool() -> List[str]:
  113. """加载代理池配置"""
  114. proxies = []
  115. # 可以在此处扩展更多 pool 类型
  116. for pool in ("oxylabs",):
  117. proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool))
  118. return proxies
  119. async def get_rate_by_date(
  120. redis_client: Redis,
  121. date: str
  122. ) -> Optional[List[TroovRate]]:
  123. """
  124. 主入口:根据日期获取 Troov 预约信息
  125. 流程:获取代理 -> 获取 Token -> 创建会话(Sync) -> 获取数据(Async)
  126. """
  127. # 1. 准备代理
  128. proxies = _get_proxy_pool()
  129. if not proxies:
  130. raise NotFoundError(message="Proxy pool is empty")
  131. # 随机选择一个代理,并在整个流程中保持一致
  132. current_proxy = random.choice(proxies)
  133. # 2. 获取验证码 Token
  134. captcha_token = await get_valid_token_from_redis(redis_client)
  135. if not captcha_token:
  136. raise NotFoundError(message="Failed to retrieve captcha token within timeout")
  137. logger.info(f"Creating session with proxy: {current_proxy}...")
  138. session_dic = await run_in_threadpool(
  139. troov_create_session_old,
  140. current_proxy,
  141. captcha_token
  142. )
  143. if not session_dic:
  144. raise BizLogicError(message="Failed to create Troov session (session_dic is empty)")
  145. logger.info(f"Troov session created successfully: {session_dic.get('session_id')}")
  146. # 确保这里传入了 current_proxy
  147. response_text = await fetch_troov_availability(session_dic, date, current_proxy)
  148. # 解析数据
  149. data = json.loads(response_text)
  150. # 这里可以加一步数据校验,确保 data 是 List[TroovRate] 格式
  151. return data