import time import json import random import re import os from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Callable, Optional, Set, Tuple from curl_cffi import requests, const from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError MODERN_BROWSERS: List[str] = [ # Chrome (124+) "chrome124", "chrome131", "chrome133a", "chrome136", "chrome142", "chrome145", "chrome146", "chrome131_android", # Safari (18+) "safari180", "safari184", "safari260", "safari2601", "safari180_ios", "safari184_ios", "safari260_ios", # Firefox (133+) "firefox133", "firefox135", "firefox144", "firefox147", "tor145" ] class GrcPlugin(IVSPlg): """ https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas 签证预约插件 适配爱尔兰希腊签证 (GR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.session: Optional[requests.Session] = None self.resource_id = '1123832' self.rp_id = None # 用于 AJAX 查询 (例如 778129) self.token = None # 用于 AJAX 查询 self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def keep_alive(self): pass def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } chosen_browser = random.choice(MODERN_BROWSERS) self._log(f"Using browser fingerprint: {chosen_browser}") self.session = requests.Session( proxy=self._get_proxy_url(), impersonate=chosen_browser, curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas" self._perform_request("GET", login_url) headers = { "Referer": login_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded" } data = { "name": self.config.account.username, "password": self.config.account.password, "remember": "K", "cookie_fix": "1", "button": "" } resp = self._perform_request('POST', login_url, headers=headers, data=data) # 判断是否登录成功 if "Sign out" in resp.text or "Signed in as" in resp.text: # [新增修复点]: 检查账号是否已达最大预约数限制 if "reached the maximum number" in resp.text or "You cannot create new reservations" in resp.text: self.is_healthy = False self._save_debug_html(resp.text, prefix='login_quota_exceeded') self._log(f"Login failed: Account '{self.config.account.username}' has reached the maximum number of reservations.") raise BizLogicError(message='Login failed: Account has reached the maximum number of reservations.') self.session_create_time = time.time() self._log(f"Session created successfully. (User: {self.config.account.username})") # 如果登录失败,SuperSaaS 通常会留在当前页面并显示错误信息 elif "Invalid email or password" in resp.text: self._save_debug_html(resp.text, prefix='login_auth_fail') raise BizLogicError(message='Login failed: Invalid email or password') else: # 其他未知错误 self._save_debug_html(resp.text, prefix='login_unknown_fail') self._log(f"Login check failed. Current URL: {resp.url}") raise BizLogicError(message='Login failed: Unknown response') def _get_daily_schedule(self, open_times, date_obj): """根据 open_times 获取当天的开始和结束分钟数""" if not open_times: return None, None weekday_py = date_obj.weekday() if weekday_py >= 4: return None, None js_day_index = (date_obj.weekday() + 1) % 7 start_min = open_times[js_day_index] end_min = open_times[js_day_index + 7] return start_min, end_min def _is_blocked_by_ecache(self, ecache_blocked, timestamp): """检查某个时间点是否在临时关闭范围内 (如节假日)""" for block in ecache_blocked: if block[0] <= timestamp < block[1]: return True return False def _fetch_schedule_data(self, start_dt: datetime, days: int) -> Dict: """发送一次 AJAX 请求,获取指定时间范围内的所有数据""" afrom_str = start_dt.strftime("%Y-%m-%d") ato_dt = start_dt + timedelta(days=days) ato_str = ato_dt.strftime("%Y-%m-%d 00:00") url = f"https://www.supersaas.com/ajax/resource/{self.rp_id}" params = { "token": self.token, "afrom": afrom_str, "ato": ato_str, "ad": "r", "efrom": afrom_str, "eto": ato_dt.strftime("%Y-%m-%d"), } headers = { "Accept": "*/*", "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "X-Requested-With": "XMLHttpRequest" } self._log(f"Fetching data from {afrom_str} to {ato_dt.strftime('%Y-%m-%d')}...") resp = self._perform_request("GET", url, params=params, headers=headers) return resp.json() def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas" headers = { "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "Origin": "https://www.supersaas.com", } resp = self._perform_request("GET", url, headers=headers) if 'Log into Visas schedule' in resp.text: self.is_healthy = False raise SessionExpiredOrInvalidError(message='Session expired.') # 提取核心参数 res_id_match = re.search(r'resource\[(\d+)\]\s*=', resp.text) if res_id_match: self.resource_id = res_id_match.group(1) rp_match = re.search(r'rp_id=(\d+)', resp.text) if rp_match: self.rp_id = rp_match.group(1) tok_match = re.search(r'token=(\d+)', resp.text) if tok_match: self.token = tok_match.group(1) if not getattr(self, 'rp_id', None) or not getattr(self, 'token', None): self._log("Failed to extract rp_id or token from HTML") raise NotFoundError(message='rp_id or token not found') # 默认一小时长度 (3600秒) default_length = 3600 len_match = re.search(r'default_length\s*=\s*(\d+)', resp.text) if len_match: default_length = int(len_match.group(1)) # 开放时间规则 open_times = None ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text) if ot_match: open_times =[int(x) for x in ot_match.group(1).split(',')] # 排期结束时间 season_end_ts = 9999999999 season_match = re.search(r'season\s*=\s*\[(\d+),(\d+)\]', resp.text) if season_match: season_end_ts = int(season_match.group(2)) # 提取全局页面屏蔽的假日例外期 (ecache) ecache_blocks =[] ecache_match = re.search(r'ecache\s*=\s*\{data:\s*\[(.*?)\]\}', resp.text) if ecache_match: # 匹配形如[1775433600,1775519970,0] 的数据 triplets = re.findall(r'\[(\d+),\s*(\d+),\s*\d+\]', ecache_match.group(1)) for t0, t1 in triplets: ecache_blocks.append((int(t0), int(t1))) scan_start_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) current_time_ts = int(time.time()) days_total_scan = 60 chunk_size = 30 valid_slots_map: dict[datetime.date, list[TimeSlot]] = {} for i in range(0, days_total_scan, chunk_size): chunk_start = scan_start_dt + timedelta(days=i) json_data = self._fetch_schedule_data(chunk_start, chunk_size) # 把已经预定的订单和节假日统一收集为“阻挡物实体墙” all_blocks: List[Tuple[int, int]] = ecache_blocks.copy() if 'app' in json_data: for item in json_data['app']: all_blocks.append((int(item[0]), int(item[1]))) if 'exc' in json_data: for item in json_data['exc']: all_blocks.append((int(item[0]), int(item[1]))) for day_offset in range(chunk_size): current_day = chunk_start + timedelta(days=day_offset) start_min, end_min = self._get_daily_schedule(open_times, current_day) if start_min is None or start_min >= end_min: continue start_ts = int(current_day.timestamp()) + start_min * 60 end_ts = int(current_day.timestamp()) + end_min * 60 curr_ts = start_ts # 开始执行官方引擎 1:1 的“碰撞与吸附算法” while curr_ts + default_length <= end_ts: if curr_ts < current_time_ts or curr_ts >= season_end_ts: curr_ts += 1800 # 安全步进 continue # 【核心机制 1】:强制网格吸附 (Snap to grid) # 官方代码 start=precalc_constraints('0') 意味着仅允许在整点 (0分) 建立预约 dt = datetime.fromtimestamp(curr_ts, tz=timezone.utc) if dt.minute != 0: # 发现不在整点 (如11:30),立刻强制向前吸附到下一个整点 (加上剩余的分钟数) minutes_to_add = 60 - dt.minute curr_ts += minutes_to_add * 60 continue # 时间已改变,重新循环执行检查 slot_end = curr_ts + default_length # 【核心机制 2】:贪婪碰撞检测 (Collision detection) overlapping_end = 0 for b_start, b_end in all_blocks: # 区间交集判断:起点早于障碍物终点 且 终点晚于障碍物起点 -> 发生碰撞 if curr_ts < b_end and slot_end > b_start: if b_end > overlapping_end: overlapping_end = b_end if overlapping_end > 0: # 碰撞触发:指针直接抛弃当前区间,跳跃到“阻挡物最晚结束的时间点” curr_ts = overlapping_end # 下一轮循环时,【机制1】会自动将其吸附回合理的网格位! else: # 完美过检:没有碰撞且处于正确网格,记录合法 Slot payload = { "resource_id": self.resource_id, "timestamp": curr_ts, "datetime": dt.strftime("%Y-%m-%d %H:%M:%S") } time_slot = TimeSlot( time=dt.strftime("%H:%M"), label=json.dumps(payload) ) date_key = dt.date() if date_key not in valid_slots_map: valid_slots_map[date_key] =[] valid_slots_map[date_key].append(time_slot) # 已放入一个格后,指针按订单长度向后推移 curr_ts += default_length if valid_slots_map: res.success = True res.availability_status = AvailabilityStatus.Available sorted_dates = sorted(valid_slots_map.keys()) res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time()) res.availability =[DateAvailability(date=datetime.combine(d, datetime.min.time()), times=valid_slots_map[d]) for d in sorted_dates] self._log(f"Found slots on {len(sorted_dates)} days.") else: self._log("No slots found.") return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: if user_inputs is None: user_inputs = {} res = VSBookResult() res.success = False # 重新登录一次,避免session expired self.create_session() exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') date_map = {d.date.strftime("%Y-%m-%d"): d for d in slot_info.availability} all_dates = list(date_map.keys()) valid_dates = self._filter_dates(all_dates, exp_start, exp_end) if not valid_dates: self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).") return res candidate_slots =[] for date_str in valid_dates: if date_str in date_map: candidate_slots.extend(date_map[date_str].times) random.shuffle(candidate_slots) if not candidate_slots: self._log("No slots found after filtering.") return res self._log(f"Found {len(candidate_slots)} candidate slots. Starting booking attempts...") # [修复点 1]:严谨处理输入参数里的空字符串,防止 dict.get() 返回空字符串触发后端空白报错 first_name = (user_inputs.get('first_name') or '').strip() last_name = (user_inputs.get('last_name') or '').strip() full_name = f"{first_name} {last_name}".strip() or "Traveler" phone_code = (user_inputs.get('phone_country_code') or '353').strip() phone_no = (user_inputs.get('phone') or '088888888').strip() mobile = f"{phone_code}{phone_no}" address = (user_inputs.get('address') or "Dublin, Ireland").strip() passport_no = (user_inputs.get('passport_no') or "P0000000").strip() nationality = (user_inputs.get('nationality') or 'Chinese').strip() visa_reason = (user_inputs.get('visa_reason') or 'tourism').strip() for target_slot in candidate_slots[0:2]: try: slot_data = json.loads(target_slot.label) start_time_str = slot_data.get('datetime') bk_res_id = slot_data.get('resource_id', self.resource_id) start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S") finish_dt = start_dt + timedelta(hours=1) # 处理抵离日期确保不为空 arrival_date = (user_inputs.get('arrival_date') or '').strip() if not arrival_date: arrival_date = (start_dt + timedelta(days=15)).strftime("%Y-%m-%d") request_url = f"https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas?view=day&day={start_dt.day}&month={start_dt.month}" fmt_start = f"{start_dt.day}/{start_dt.month}/{start_dt.year} {start_dt.strftime('%H:%M')}" fmt_finish = f"{finish_dt.day}/{finish_dt.month}/{finish_dt.year} {finish_dt.strftime('%H:%M')}" headers = { "Referer": request_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded", } # ========================== # Request step 1: 预检查/打开弹窗 # ========================== payload_stp_1 = { "reservation[start_time]": fmt_start, "reservation[finish_time]": fmt_finish, "reservation[full_name]": full_name, "reservation[mobile]": mobile, "reservation[address]": address, "reservation[resource_id]": bk_res_id, "button": "", "reservation[xpos]": "100", "reservation[ypos]": "200" } self._log(f"[Attempt] Try slot {start_time_str} - Req step 1") resp_stp_1 = self._perform_request('POST', request_url, data=payload_stp_1, headers=headers) if "This spot has already been taken" in resp_stp_1.text: self._log(f"Slot {start_time_str} is TAKEN. Trying next...") continue if 'id="reservation_error"' in resp_stp_1.text or 'class="dbox-error"' in resp_stp_1.text: err_match = re.search(r'