troov_service.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import json
  2. import time
  3. import requests
  4. from typing import List
  5. from fastapi import Depends
  6. from app.schemas.troov import TroovRate
  7. from app.utils.france_slot_api import *
  8. from app.utils.proxy_utils import load_proxies_from_json
  9. def pop_redis_value_token(redis_client):
  10. lua_script = '''
  11. local keys = redis.call('keys', 'token:*')
  12. local max_ttl = -1
  13. local max_key = nil
  14. for _, key in ipairs(keys) do
  15. local ttl = redis.call('ttl', key)
  16. if ttl > 0 and ttl > max_ttl then
  17. max_ttl = ttl
  18. max_key = key
  19. end
  20. end
  21. if max_key then
  22. local value = redis.call('get', max_key)
  23. redis.call('del', max_key)
  24. return {max_key, value, max_ttl}
  25. else
  26. return nil
  27. end
  28. '''
  29. result = redis_client.eval(lua_script, 0)
  30. return result
  31. def fetch_rate(session_dic, date):
  32. url = f"https://api.consulat.gouv.fr/api/team/621540d353069dec25bd0045/reservations/availability?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5&sessionId={session_dic['session_id']}"
  33. headers = {
  34. 'accept': 'application/json, text/plain, */*',
  35. 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
  36. 'origin': 'https://consulat.gouv.fr',
  37. 'referer': 'https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas',
  38. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
  39. 'x-gouv-app-id': session_dic['x_gouv_app_id'],
  40. 'x-gouv-web': 'fr.gouv.consulat'
  41. }
  42. try:
  43. response = requests.get(url, headers=headers)
  44. return response.text
  45. except Exception as e:
  46. return f'Exception info={e}'
  47. def get_rate_by_date(redis_client, date: str) -> List[TroovRate]:
  48. """
  49. 核心业务逻辑:根据日期返回 Troov 预约信息
  50. """
  51. proxy_pools = ['oxylabs']
  52. proxies = []
  53. for pp in proxy_pools:
  54. proxies = proxies + load_proxies_from_json("data/proxy_pool_config.json", pp)
  55. result = None
  56. while True:
  57. result = pop_redis_value_token(redis_client)
  58. if not result:
  59. time.sleep(1)
  60. continue
  61. break
  62. body_str = result[1]
  63. body = json.loads(body_str)
  64. captcha = body.get("token")
  65. session_dic = troov_create_session_old(random.choice(proxies), captcha)
  66. if not session_dic:
  67. return None
  68. logger.info(f'创建 session 成功: {session_dic}')
  69. res = fetch_rate(session_dic, date)
  70. return json.loads(res)