| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import json
- import time
- import requests
- from typing import List
- from fastapi import Depends
- from app.schemas.troov import TroovRate
- from app.utils.france_slot_api import *
- from app.utils.proxy_utils import load_proxies_from_json
- def pop_redis_value_token(redis_client):
- lua_script = '''
- local keys = redis.call('keys', 'token:*')
- local max_ttl = -1
- local max_key = nil
- for _, key in ipairs(keys) do
- local ttl = redis.call('ttl', key)
- if ttl > 0 and ttl > max_ttl then
- max_ttl = ttl
- max_key = key
- end
- end
- if max_key then
- local value = redis.call('get', max_key)
- redis.call('del', max_key)
- return {max_key, value, max_ttl}
- else
- return nil
- end
- '''
- result = redis_client.eval(lua_script, 0)
- return result
- def fetch_rate(session_dic, date):
- url = f"https://api.consulat.gouv.fr/api/team/621540d353069dec25bd0045/reservations/availability?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5&sessionId={session_dic['session_id']}"
- headers = {
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'origin': 'https://consulat.gouv.fr',
- 'referer': 'https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
- 'x-gouv-app-id': session_dic['x_gouv_app_id'],
- 'x-gouv-web': 'fr.gouv.consulat'
- }
- try:
- response = requests.get(url, headers=headers)
- return response.text
- except Exception as e:
- return f'Exception info={e}'
- def get_rate_by_date(redis_client, date: str) -> List[TroovRate]:
- """
- 核心业务逻辑:根据日期返回 Troov 预约信息
- """
- proxy_pools = ['oxylabs']
- proxies = []
- for pp in proxy_pools:
- proxies = proxies + load_proxies_from_json("data/proxy_pool_config.json", pp)
-
- result = None
- while True:
- result = pop_redis_value_token(redis_client)
- if not result:
- time.sleep(1)
- continue
- break
- body_str = result[1]
- body = json.loads(body_str)
- captcha = body.get("token")
-
- session_dic = troov_create_session_old(random.choice(proxies), captcha)
- if not session_dic:
- return None
- logger.info(f'创建 session 成功: {session_dic}')
- res = fetch_rate(session_dic, date)
- return json.loads(res)
-
|