import json import time import requests from typing import List from fastapi import Depends from app.schemas.troov import TroovRate def pop_redis_value_session(redis_client): lua_script = ''' local keys = redis.call('keys', 'session:*') local max_ttl = -1 local max_key = nil for _, key in ipairs(keys) do local ttl = redis.call('ttl', key) if ttl > max_ttl then max_ttl = ttl max_key = key end end if max_key then local value = redis.call('get', max_key) redis.call('del', max_key) return {max_key, value, max_ttl} else return nil end ''' result = redis_client.eval(lua_script, 0) return result def fetch_rate(session_dic, date): url = f"https://api.consulat.gouv.fr/api/team/621540d353069dec25bd0045/reservations/availability?name=Visas&date={date}&places=-5&matching=&maxCapacity=-5&sessionId={session_dic['session_id']}" headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'origin': 'https://consulat.gouv.fr', 'referer': 'https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', 'x-gouv-app-id': session_dic['x_gouv_app_id'], 'x-gouv-web': 'fr.gouv.consulat' } try: response = requests.get(url, headers=headers) return response.text except Exception as e: return f'Exception info={e}' def get_rate_by_date(redis_client, date: str) -> List[TroovRate]: """ 核心业务逻辑:根据日期返回 Troov 预约信息 """ result = None while True: result = pop_redis_value_session(redis_client) if not result: time.sleep(1) continue break session_data = result[1] session_dic = json.loads(session_data) res = fetch_rate(session_dic, date) return json.loads(res)