import json import time import random import asyncio import aiohttp from datetime import datetime, timedelta from typing import List, Optional, Tuple, Dict, Any from redis.asyncio import Redis from starlette.concurrency import run_in_threadpool from app.core.biz_exception import NotFoundError, BizLogicError from app.schemas.troov import TroovRate, TroovProb, TroovCheckForbiddenInput from app.utils.france_slot_api import troov_create_session_old from app.utils.proxy_utils import load_proxies_from_json from app.core.logger import logger class TroovService: """ Troov 业务逻辑工具类 (Pure Static) """ # Redis Lua Script for atomic token retrieval POP_TOKEN_LUA = """ local cursor = "0" local max_ttl = -1 local max_key = nil repeat local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50) cursor = result[1] local keys = result[2] for _, key in ipairs(keys) do local ttl = redis.call('TTL', key) if ttl > max_ttl then max_ttl = ttl; max_key = key end end until cursor == "0" if max_key then local value = redis.call('GET', max_key) redis.call('DEL', max_key) return {max_key, value, max_ttl} end return nil """ # Static configuration data for payload (Moved from function to class constant) _ZONE_DATA = { "name": 'Visas', "name_traduction": {"fr": 'Visas', "en": "", "zh": "", "ar": "", "ru": "", "it": "", "es": "", "de": "", "pt": ""}, "enable_external_url": False, "external_url": "", "has_paid_reservation": False, "openings": [ {"day": 1, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf357"}, {"day": 2, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf358"}, {"day": 3, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf359"}, {"day": 4, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf35a"}, {"day": 5, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf35b"} ], "custom_openings": [], "breaktimes": [ [ {"day": 1, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af678"}, {"day": 2, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af679"}, {"day": 3, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67a"}, {"day": 4, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67b"}, {"day": 5, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67c"} ], [] ], "session_duration": 15, "session_type": "people", "session_reservation_max": 1000, "session_people_max": 1, "reservation_people_max": 1, "is_priority": True, "reservation_delay_hours": 0, "start_opening": "2022-03-31", "end_opening": "2025-12-31", "is_open": True, "is_open_internal": True, "stand_alone_calendar": False, "note": {"ar": "", "de": "", "en": "", "es": "", "fr": "", "it": "", "nl": "", "pt": "", "ru": "", "zh": ""}, "dynamic_calendar_enabled": True, "dynamic_calendar_ending": {"hour": "default", "minute": "default"}, "external_link_for_documents": "https://france-visas.gouv.fr/en/web/france-visas/online-application", "dynamic_calendar": {"begin": {"type": "days"}, "end": {"type": "days", "value": 7}}, "closed_days": [ "2024-01-01", "2024-02-05", "2024-03-18", "2024-03-22", "2024-03-23", "2024-03-24", "2024-03-25", "2024-03-26", "2024-03-27", "2024-03-28", "2024-03-29", "2024-04-01", "2024-05-01", "2024-05-06", "2024-05-09", "2024-06-03", "2024-08-05", "2024-10-28", "2024-12-25", "2024-12-26", "2025-01-01", "2025-02-03", "2025-03-17", "2025-04-21", "2025-05-05", "2025-06-02", "2025-08-04", "2025-08-15", "2025-10-27", "2025-12-25", "2025-12-26" ], "custom_fields": [], "service_color": "#e91e63", "enable_repeat_form": False, "enable_fullday_slots": False, "session_price": 0, "cancel_limit": {"value": 0, "type": "days"}, "activate_waiting_list": False, "deactivate_reservation_cancelation": True, "_id": '624317926863643fe83c8548' } _probability_model = 'probability_model' _time_slots_am = [ "09:30", "09:45", "10:00", "10:15", "10:30", "10:45", "11:00", "11:15", "11:30", "11:45" ] _time_slots_pm = ["14:00", "14:15", "14:30", "14:45", "15:00"] # ========================================================= # Public Methods # ========================================================= @staticmethod async def check_for_forbiddenusers(redis_client: Redis, payload: TroovCheckForbiddenInput) -> Dict[str, Any]: """检测用户是否被禁止""" current_proxy, session_dic = await TroovService._prepare_session_context(redis_client) booking_token = await TroovService._get_valid_token(redis_client) if not booking_token: raise NotFoundError(message="Failed to retrieve second captcha token for booking") # Inline logic: Calculate next Monday today = datetime.today() days_ahead = 7 - today.weekday() if days_ahead == 0: days_ahead = 7 date = (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d") slot = {'time': '09:30', 'rate': '0.00', 'capacity': 1} book_uinfo = { "id": 0, "birth_date": payload.birthday.strftime("%m/%d/%Y"), "email": 'arket_zz@163.com', "phone": '+3530829394212', "first_name": payload.first_name, "last_name": payload.last_name, } # Build Payload inline book_body = TroovService._build_book_payload(session_dic['session_id'], date, slot, book_uinfo, booking_token) # Exec Request url = f"https://51.254.177.49/api/team/{session_dic['embassy']['teamId']}/reservations/family" headers = { 'accept': 'application/json, text/plain, */*', 'content-type': 'application/json', 'origin': 'https://consulat.gouv.fr', 'referer': session_dic['embassy']['website'], "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", 'x-csrf-token': session_dic['x-csrf-token'], 'x-gouv-app-id': session_dic['x_gouv_app_id'], 'x-gouv-web': 'fr.gouv.consulat', } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), connector=aiohttp.TCPConnector(ssl=False)) as session: async with session.post(url, headers=headers, data=json.dumps(book_body), proxy=current_proxy) as resp: return json.loads(await resp.text()) @staticmethod async def get_rate_by_date(redis_client: Redis, date: str) -> List[TroovRate]: """根据日期获取预约可用性""" current_proxy, session_dic = await TroovService._prepare_session_context(redis_client) url = "https://51.254.177.49/api/team/621540d353069dec25bd0045/reservations/availability" params = { "name": "Visas", "date": date, "places": "-5", "matching": "", "maxCapacity": "-5", "sessionId": session_dic.get("session_id") } headers = { "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "x-gouv-app-id": session_dic.get("x_gouv_app_id"), "x-gouv-web": "fr.gouv.consulat", } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), connector=aiohttp.TCPConnector(ssl=False)) as session: async with session.get(url, params=params, headers=headers, proxy=current_proxy) as resp: resp.raise_for_status() return json.loads(await resp.text()) # ========================================================= # Internal Logic # ========================================================= @staticmethod async def _prepare_session_context(redis_client: Redis) -> Tuple[str, Dict[str, Any]]: """获取代理 -> 获取Token -> 创建Session""" # Inline proxy loading proxies = [] for pool in ("oxylabs",): proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool)) if not proxies: raise NotFoundError(message="Proxy pool is empty") current_proxy = random.choice(proxies) captcha_token = await TroovService._get_valid_token(redis_client) if not captcha_token: raise NotFoundError(message="Failed to retrieve captcha token") logger.info(f"Creating session with proxy: {current_proxy}...") session_dic = await run_in_threadpool(troov_create_session_old, current_proxy, captcha_token) if not session_dic: raise BizLogicError(message="Failed to create Troov session") logger.info(f"Troov session created: {session_dic.get('session_id')}") return current_proxy, session_dic @staticmethod async def get_all_probs(redis_client: Redis) -> List[TroovProb]: prob_map = await redis_client.hgetall(TroovService._probability_model) res = [] for k, v in prob_map.items(): # redis 返回 bytes key_str = k.decode() if isinstance(k, (bytes, bytearray)) else k val_str = v.decode() if isinstance(v, (bytes, bytearray)) else v res.append( TroovProb( prob_key=datetime.strptime(key_str, "%Y-%m-%d.%H:%M"), prob_val=float(val_str) if val_str is not None else None, ) ) # 按时间排序(强烈建议) res.sort(key=lambda x: x.prob_key) return res @staticmethod async def set_prob(redis_client: Redis, payload: TroovProb): """更新 / 新增概率""" slot_key = payload.prob_key.strftime("%Y-%m-%d.%H:%M") prob_val = payload.prob_val if prob_val is not None: prob_val = max(0.0, min(1.0, prob_val)) await redis_client.hset( TroovService._probability_model, slot_key, prob_val, ) return await TroovService.get_all_probs(redis_client) @staticmethod async def del_prob(redis_client: Redis, payload: TroovProb): """移除单个 slot 概率""" slot_key = payload.prob_key.strftime("%Y-%m-%d.%H:%M") await redis_client.hdel( TroovService._probability_model, slot_key, ) return await TroovService.get_all_probs(redis_client) @staticmethod async def reset_probs(redis_client: Redis, date: str): """ 重置某一天的所有时间槽概率 周六周日不生成 周五只有上午 """ await redis_client.delete(TroovService._probability_model) date_dt = datetime.strptime(date, "%Y-%m-%d") # 周六、周日 if date_dt.weekday() in (5, 6): return [] if date_dt.weekday() == 4: # 周五 timeslots = TroovService._time_slots_am else: timeslots = TroovService._time_slots_am + TroovService._time_slots_pm mapping = { f"{date}.{time_slot}": 0.5 for time_slot in timeslots } if mapping: await redis_client.hset( TroovService._probability_model, mapping=mapping, ) return await TroovService.get_all_probs(redis_client) @staticmethod async def _get_valid_token(redis_client: Redis, timeout: int = 30) -> Optional[str]: start_time = time.time() while time.time() - start_time < timeout: result = await redis_client.eval(TroovService.POP_TOKEN_LUA, 0) if result: try: return json.loads(result[1]).get("token") except: logger.warning("Invalid token format in Redis") await asyncio.sleep(1) return None @staticmethod def _build_book_payload(sid, date, slot, uinfo, token): """构造预约 Payload (原 troov_dublin_visas_book_data_builder,逻辑已内联)""" # Parse Dates Inline birth_parts = uinfo['birth_date'].split('/') dt_obj = datetime.strptime(date, "%Y-%m-%d") slot_h, slot_m = slot['time'].split(":") # Formatter logic inline fmt_date_full = dt_obj.strftime("%Y-%m-%dT%H:%M:%S.000Z") fmt_tz = datetime.strptime(slot['time'], "%H:%M").strftime("-%H:%M") formatted_slot_datetime = f"{fmt_date_full}{fmt_tz}" slot_value_str = f"slot-visas-{formatted_slot_datetime}" # Build Slot Object Inline slot_obj = { 'time': slot['time'], 'rate': slot['rate'], 'capacity': slot['capacity'], 'numberOfApplicants': 1, "date": f"{date}T{slot['time']}:00", "localDateString": dt_obj.strftime("%m/%d/%Y"), "dateObject": { "year": dt_obj.year, "month": dt_obj.month - 1, "day": dt_obj.day, "hour": slot_h, "minute": slot_m, }, "id": 0, "slotValue": slot_value_str } return { "reservations": { "mainUser": { "lastname": uinfo['last_name'], "firstname": uinfo['first_name'], "email": uinfo['email'], "mobile": uinfo['phone'], "birthdate": {"month": int(birth_parts[0]) - 1, "day": int(birth_parts[1]), "year": int(birth_parts[2])}, "slots": {}, "services": [ { "zone": TroovService._ZONE_DATA, "zone_id": '624317926863643fe83c8548', "external_link_for_documents": "https://france-visas.gouv.fr/en/web/france-visas/online-application", "label": 'Visas', "name": 'Visas', "numberOfSlots": 1, "maxSlots": 5, "checkboxesSlots": [slot_value_str], "customFields": [], "customFieldsAreValid": True, "slots": [slot_obj], "slotsToKeep": [slot_obj] } ] }, "secondaryUsers": [], "sessionId": sid, "team": "621540d353069dec25bd0045" }, "language": "en", "captcha": token, "sessionId": sid }