import time import json import random import re import os from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Callable, Optional, Set, Tuple from curl_cffi import requests, const from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError class GrcPlugin(IVSPlg): """ https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas 签证预约插件 适配爱尔兰希腊签证 (GR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.session: Optional[requests.Session] = None self.resource_id = '1123832' self.rp_id = None # 用于 AJAX 查询 (例如 778129) self.token = None # 用于 AJAX 查询 self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def keep_alive(self): pass def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas" self._perform_request("GET", login_url) headers = { "Referer": login_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded" } data = { "name": self.config.account.username, "password": self.config.account.password, "remember": "K", "cookie_fix": "1", "button": "" } resp = self._perform_request('POST', login_url, headers=headers, data=data) # 判断是否登录成功 if "Sign out" in resp.text or "Signed in as" in resp.text: # [新增修复点]: 检查账号是否已达最大预约数限制 if "reached the maximum number" in resp.text or "You cannot create new reservations" in resp.text: self.is_healthy = False self._save_debug_html(resp.text, prefix='login_quota_exceeded') self._log(f"Login failed: Account '{self.config.account.username}' has reached the maximum number of reservations.") raise BizLogicError(message='Login failed: Account has reached the maximum number of reservations.') self.session_create_time = time.time() self._log(f"Session created successfully. (User: {self.config.account.username})") # 如果登录失败,SuperSaaS 通常会留在当前页面并显示错误信息 elif "Invalid email or password" in resp.text: self._save_debug_html(resp.text, prefix='login_auth_fail') raise BizLogicError(message='Login failed: Invalid email or password') else: # 其他未知错误 self._save_debug_html(resp.text, prefix='login_unknown_fail') self._log(f"Login check failed. Current URL: {resp.url}") raise BizLogicError(message='Login failed: Unknown response') def _get_daily_schedule(self, open_times, date_obj): """根据 open_times 获取当天的开始和结束分钟数""" if not open_times: return None, None weekday_py = date_obj.weekday() if weekday_py >= 4: return None, None js_day_index = (date_obj.weekday() + 1) % 7 start_min = open_times[js_day_index] end_min = open_times[js_day_index + 7] return start_min, end_min def _is_blocked_by_ecache(self, ecache_blocked, timestamp): """检查某个时间点是否在临时关闭范围内 (如节假日)""" for block in ecache_blocked: if block[0] <= timestamp < block[1]: return True return False def _fetch_schedule_data(self, start_dt: datetime, days: int) -> Dict: """发送一次 AJAX 请求,获取指定时间范围内的所有数据""" afrom_str = start_dt.strftime("%Y-%m-%d") ato_dt = start_dt + timedelta(days=days) ato_str = ato_dt.strftime("%Y-%m-%d 00:00") url = f"https://www.supersaas.com/ajax/resource/{self.rp_id}" params = { "token": self.token, "afrom": afrom_str, "ato": ato_str, "ad": "r", "efrom": afrom_str, "eto": ato_dt.strftime("%Y-%m-%d"), } headers = { "Accept": "*/*", "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "X-Requested-With": "XMLHttpRequest" } self._log(f"Fetching data from {afrom_str} to {ato_dt.strftime('%Y-%m-%d')}...") resp = self._perform_request("GET", url, params=params, headers=headers) return resp.json() def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas" headers = { "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "Origin": "https://www.supersaas.com", } resp = self._perform_request("GET", url, headers=headers) if 'Log into Visas schedule' in resp.text: self.is_healthy = False raise SessionExpiredOrInvalidError(message='Session expired.') res_id_match = re.search(r'resource\[(\d+)\]\s*=', resp.text) if res_id_match: self.resource_id = res_id_match.group(1) rp_match = re.search(r'rp_id=(\d+)', resp.text) if rp_match: self.rp_id = rp_match.group(1) tok_match = re.search(r'token=(\d+)', resp.text) if tok_match: self.token = tok_match.group(1) if not self.rp_id or not self.token: self._log("Failed to extract rp_id or token from HTML") raise NotFoundError(message='rp_id or token not found') default_length = None len_match = re.search(r'default_length\s*=\s*(\d+)', resp.text) if len_match: default_length = int(len_match.group(1)) open_times = None ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text) if ot_match: open_times = [int(x) for x in ot_match.group(1).split(',')] season_end_ts = 9999999999 season_match = re.search(r'season\s*=\s*\[(\d+),(\d+)\]', resp.text) if season_match: season_end_ts = int(season_match.group(2)) cursor_match = re.search(r'Date\.UTC\((\d+),(\d+),(\d+),(\d+)\)', resp.text) if cursor_match: y, m, d, h = map(int, cursor_match.groups()) start_date = datetime(y, m + 1, d, h, tzinfo=timezone.utc) else: start_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) scan_start_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) days_total_scan = 60 chunk_size = 30 valid_slots_map: dict[datetime.date, list[TimeSlot]] = {} for i in range(0, days_total_scan, chunk_size): chunk_start = scan_start_dt + timedelta(days=i) json_data = self._fetch_schedule_data(chunk_start, chunk_size) booked_ts: Set[int] = set() blocked_ranges: List[Tuple[int, int]] =[] if 'app' in json_data: for item in json_data['app']: booked_ts.add(int(item[0])) if 'exc' in json_data: for item in json_data['exc']: blocked_ranges.append((int(item[0]), int(item[1]))) for day_offset in range(chunk_size): current_day = chunk_start + timedelta(days=day_offset) start_min, end_min = self._get_daily_schedule(open_times, current_day) if start_min is None or start_min >= end_min: continue curr_min = start_min while curr_min + (default_length / 60) <= end_min: slot_hour = int(curr_min // 60) slot_minute = int(curr_min % 60) slot_dt = current_day.replace(hour=slot_hour, minute=slot_minute, second=0, microsecond=0) slot_ts = int(slot_dt.timestamp()) curr_min += (default_length / 60) if slot_ts < time.time(): continue if slot_ts >= season_end_ts: continue if slot_ts in booked_ts: continue is_blocked = False check_ts = slot_ts + 1 for b_start, b_end in blocked_ranges: if b_start <= check_ts < b_end: is_blocked = True break if is_blocked: continue payload = { "resource_id": self.resource_id, "timestamp": slot_ts, "datetime": slot_dt.strftime("%Y-%m-%d %H:%M:%S") } time_slot = TimeSlot( time=slot_dt.strftime("%H:%M"), label=json.dumps(payload) ) date_key = slot_dt.date() if date_key not in valid_slots_map: valid_slots_map[date_key] = [] valid_slots_map[date_key].append(time_slot) if valid_slots_map: res.success = True res.availability_status = AvailabilityStatus.Available sorted_dates = sorted(valid_slots_map.keys()) res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time()) res.availability =[DateAvailability(date=datetime.combine(d, datetime.min.time()), times=valid_slots_map[d]) for d in sorted_dates] self._log(f"Found slots on {len(sorted_dates)} days.") else: self._log("No slots found.") return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: if user_inputs is None: user_inputs = {} res = VSBookResult() res.success = False # --- 1. 筛选并收集所有可用 Slot --- exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') date_map = {d.date.strftime("%Y-%m-%d"): d for d in slot_info.availability} all_dates = list(date_map.keys()) valid_dates = self._filter_dates(all_dates, exp_start, exp_end) if not valid_dates: self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).") return res candidate_slots =[] for date_str in valid_dates: if date_str in date_map: candidate_slots.extend(date_map[date_str].times) random.shuffle(candidate_slots) if not candidate_slots: self._log("No slots found after filtering.") return res self._log(f"Found {len(candidate_slots)} candidate slots. Starting booking attempts...") # [修复点 1]:严谨处理输入参数里的空字符串,防止 dict.get() 返回空字符串触发后端空白报错 first_name = (user_inputs.get('first_name') or '').strip() last_name = (user_inputs.get('last_name') or '').strip() full_name = f"{first_name} {last_name}".strip() or "Traveler" phone_code = (user_inputs.get('phone_country_code') or '353').strip() phone_no = (user_inputs.get('phone') or '088888888').strip() mobile = f"{phone_code}{phone_no}" address = (user_inputs.get('address') or "Dublin, Ireland").strip() passport_no = (user_inputs.get('passport_no') or "P0000000").strip() nationality = (user_inputs.get('nationality') or 'Chinese').strip() visa_reason = (user_inputs.get('visa_reason') or 'tourism').strip() for target_slot in candidate_slots[0:2]: try: slot_data = json.loads(target_slot.label) start_time_str = slot_data.get('datetime') bk_res_id = slot_data.get('resource_id', self.resource_id) start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S") finish_dt = start_dt + timedelta(hours=1) # 处理抵离日期确保不为空 arrival_date = (user_inputs.get('arrival_date') or '').strip() if not arrival_date: arrival_date = (start_dt + timedelta(days=15)).strftime("%Y-%m-%d") request_url = f"https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas?view=day&day={start_dt.day}&month={start_dt.month}" fmt_start = f"{start_dt.day}/{start_dt.month}/{start_dt.year} {start_dt.strftime('%H:%M')}" fmt_finish = f"{finish_dt.day}/{finish_dt.month}/{finish_dt.year} {finish_dt.strftime('%H:%M')}" headers = { "Referer": request_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded", } # ========================== # Request step 1: 预检查/打开弹窗 # ========================== payload_stp_1 = { "reservation[start_time]": fmt_start, "reservation[finish_time]": fmt_finish, "reservation[full_name]": full_name, "reservation[mobile]": mobile, "reservation[address]": address, "reservation[resource_id]": bk_res_id, "button": "", "reservation[xpos]": "100", "reservation[ypos]": "200" } self._log(f"[Attempt] Try slot {start_time_str} - Req step 1") resp_stp_1 = self._perform_request('POST', request_url, data=payload_stp_1, headers=headers) if "This spot has already been taken" in resp_stp_1.text: self._log(f"Slot {start_time_str} is TAKEN. Trying next...") continue if 'id="reservation_error"' in resp_stp_1.text or 'class="dbox-error"' in resp_stp_1.text: err_match = re.search(r'