import time import json import random import re import os from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Callable, Optional from curl_cffi import requests, const from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError class GrcPlugin(IVSPlg): """ https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas 签证预约插件 适配爱尔兰希腊签证 (GR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.session: Optional[requests.Session] = None self.resource_id = '1123832' self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas" self._perform_request("GET", login_url) headers = { "Referer": login_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded" } data = { "name": self.config.account.username, "password": self.config.account.password, "remember": "K", "cookie_fix": "1", "button": "" } resp = self._perform_request('POST', login_url, headers=headers, data=data) if "Sign out" in resp.text or "Signed in as" in resp.text: self.session_create_time = time.time() self._log(f"Session created successfully. (User: {self.config.account.username})") # 如果登录失败,SuperSaaS 通常会留在当前页面并显示错误信息 elif "Invalid email or password" in resp.text: self._save_debug_html(resp.text, prefix='login_auth_fail') raise BizLogicError(message='Login failed: Invalid email or password') else: # 其他未知错误 self._save_debug_html(resp.text, prefix='login_unknown_fail') # 打印 URL 辅助调试,看是否跳转了 self._log(f"Login check failed. Current URL: {resp.url}") raise BizLogicError(message='Login failed: Unknown response') def _get_daily_schedule(self, open_times, date_obj): """根据 open_times 获取当天的开始和结束分钟数""" if not open_times: return None, None weekday_py = date_obj.weekday() if weekday_py >= 4: return None, None js_day_index = (date_obj.weekday() + 1) % 7 start_min = open_times[js_day_index] end_min = open_times[js_day_index + 7] return start_min, end_min def _is_blocked_by_ecache(self, ecache_blocked, timestamp): """检查某个时间点是否在临时关闭范围内 (如节假日)""" for block in ecache_blocked: if block[0] <= timestamp < block[1]: return True return False def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas" headers = { "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "Origin": "https://www.supersaas.com", } resp = self._perform_request("GET", url, headers=headers) if self.config.debug: self._save_debug_html(resp.text, prefix='Grc_Query_Slot_Page') if 'Log into Visas schedule' in resp.text: self.is_healthy = False raise SessionExpiredOrInvalidError(message='Session expired.') res_id_match = re.search(r'resource\[(\d+)\]\s*=', resp.text) if res_id_match: self.resource_id = res_id_match.group(1) default_length = None len_match = re.search(r'default_length\s*=\s*(\d+)', resp.text) if len_match: default_length = int(len_match.group(1)) # 提取每日营业时间 (open_times) open_times = None ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text) if ot_match: open_times = [int(x) for x in ot_match.group(1).split(',')] # 提取临时关闭/休息日 (ecache) ecache_blocked = None ec_match = re.search(r'var ecache\s*=\s*(\{.*?\})', resp.text) if ec_match: ec_data_str = ec_match.group(1) data_match = re.search(r'data:\s*(\[\[.*?\]\])', ec_data_str) if data_match: ecache_blocked = json.loads(data_match.group(1)) # 提取已预约数据 (app) booked_timestamps = None app_match = re.search(r'var app\s*=\s*(\[\[.*?\]\])', resp.text) if app_match: app_data = json.loads(app_match.group(1)) booked_timestamps = set(item[0] for item in app_data) # 提取 season (非常重要:学期/季度限制) season_range = None season_match = re.search(r'season\s*=\s*\[(\d+),(\d+)\]', resp.text) if season_match: season_range = [int(season_match.group(1)), int(season_match.group(2))] print(f"[*] 限制范围 (Season): 截止到 {datetime.fromtimestamp(season_range[1], timezone.utc)}") # 确定扫描起点 cursor_match = re.search(r'Date\.UTC\((\d+),(\d+),(\d+),(\d+)\)', resp.text) if cursor_match: y, m, d, h = map(int, cursor_match.groups()) start_date = datetime(y, m + 1, d, h, tzinfo=timezone.utc) else: start_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) print(f"[*] 分析配置: 默认时长 {default_length/60} 分钟") days_to_scan = 5 * 7 valid_slots_map: dict[datetime.date, list[TimeSlot]] = {} for day_offset in range(days_to_scan): current_day = start_date + timedelta(days=day_offset) start_min, end_min = self._get_daily_schedule(open_times, current_day) # 如果开始=结束 (比如都是 600),或者没有定义,说明当天不营业 if start_min is None or start_min >= end_min: continue # 生成当天的所有 Slot, 将分钟转换为当天的具体时间 start_min 630 -> 10:30 current_slot_min = start_min while current_slot_min + (default_length / 60) <= end_min: # 计算 Slot 的具体时间对象 slot_hour = current_slot_min // 60 slot_minute = current_slot_min % 60 slot_dt = current_day.replace(hour=int(slot_hour), minute=int(slot_minute), second=0, microsecond=0) slot_ts = int(slot_dt.timestamp()) # 下一个 slot 开始时间 current_slot_min += (default_length / 60) # 检查是否过期 if slot_ts < time.time(): continue # Slot 时间 必须在 Season 范围内 if slot_ts >= season_range[1]: # print(f" [Skip] {slot_dt} 超出 Season 范围") continue # 检查是否在临时关闭列表 (ecache) 中 if self._is_blocked_by_ecache(ecache_blocked, slot_ts + 1): # print(f" [Skip] {slot_dt} 被 ecache (节假日/关闭) 屏蔽") continue # 检查是否已被预约 (在 app 数组中) if slot_ts in booked_timestamps: # print(f" [Skip] {slot_dt} 已被预约") continue booking_payload = { "timestamp": slot_ts, "datetime": slot_dt.strftime("%Y-%m-%d %H:%M:%S") } time_slot = TimeSlot( time=slot_dt.strftime("%H:%M"), label=json.dumps(booking_payload) # 序列化存入 label ) date_key = slot_dt.date() if date_key not in valid_slots_map: valid_slots_map[date_key] = [] valid_slots_map[date_key].append(time_slot) if valid_slots_map: res.success = True res.availability_status = AvailabilityStatus.Available # 按日期排序 sorted_dates = sorted(valid_slots_map.keys()) res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time()) res.availability = [] for d in sorted_dates: res.availability.append(DateAvailability( date=datetime.combine(d, datetime.min.time()), times=valid_slots_map[d] )) self._log(f"Found availability on {len(sorted_dates)} days.") else: self._log("No available slots found.") return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: res = VSBookResult() res.success = False # 1. 准备日期筛选参数 exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') # 将 Availability 转换为 { "YYYY-MM-DD": DateAvailabilityObj } 的映射,方便查找 date_map = {} available_date_strs = [] for date_avail in slot_info.availability: # datetime 转 string (YYYY-MM-DD) d_str = date_avail.date.strftime("%Y-%m-%d") date_map[d_str] = date_avail available_date_strs.append(d_str) # 2. 筛选符合要求的日期 # _filter_dates 内部已经进行了 random.shuffle,所以返回列表的第一个即为随机选中的有效日期 valid_dates = self._filter_dates(available_date_strs, exp_start, exp_end) if not valid_dates: self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).") return res # 3. 选择具体的 Slot target_date_str = valid_dates[0] target_day_data = date_map[target_date_str] if not target_day_data.times: res.message = f"Date {target_date_str} has no time slots." return res # 这里简单策略:选择该日期的第一个可用时间点 # 如果需要随机时间,可以使用 random.choice(target_day_data.times) target_slot = target_day_data.times[0] # 4. 解析 Slot Label 数据 # label 中存储了 {"timestamp": 123456, "datetime": "2026-02-06 10:00:00", ...} try: slot_data = json.loads(target_slot.label) start_time_str = slot_data.get('datetime') except Exception as e: self._log(f"Failed to parse slot label: {e}") return res # 5. 准备预定请求数据 url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas" # 计算 finish_time (SuperSaaS通常需要 finish_time,默认时长1小时) start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S") finish_dt = start_dt + timedelta(hours=1) finish_time_str = finish_dt.strftime("%Y-%m-%d %H:%M:%S") # 映射用户信息 (假设 self.config.profile 包含这些字段) # 根据之前的 HTML 分析: # field_1_r -> Passport number # field_2_r -> Reason for Visa first_name = user_inputs.get('first_name', '') last_name = user_inputs.get('last_name', '') phone_country_code = user_inputs.get('phone_country_code', '353') phone_no = user_inputs.get('phone_no', '088888888') address = user_inputs.get('address', "Dublin, Ireland") passport_no = user_inputs.get('passport_no', "") payload = { "reservation[start_time]": start_time_str, "reservation[finish_time]": finish_time_str, "reservation[full_name]": f"{first_name} {last_name}", "reservation[mobile]": f'+{phone_country_code}{phone_no}', "reservation[address]": address, "reservation[description]": "", # 自定义必填字段 "reservation[field_1_r]": passport_no, "reservation[field_2_r]": "Tourism", # 系统字段 "reservation[resource_id]": self.resource_id, "reservation[xpos]": "", "reservation[ypos]": "", "button": "" } headers = { "Referer": url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded", } self._log(f"Attempting to book slot: {start_time_str}") resp = self._perform_request('POST', url, data=payload, headers=headers) res.success = True res.book_date = start_dt.strftime("%Y-%m-%d") # 格式: YYYY-mm-dd res.book_time = start_dt.strftime("%H:%M") # 格式: hh:mm self._log(f"Booking successful for {res.book_date} at {res.book_time}") return res def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: """ 根据用户的期望范围筛选可用日期 :param dates: API 返回的可用日期列表 (YYYY-MM-DD) :param start_str: 用户期望开始日期 (YYYY-MM-DD) :param end_str: 用户期望结束日期 (YYYY-MM-DD) :return: 符合要求的日期列表 """ # 如果没有设置范围,则不过滤,返回所有日期 if not start_str or not end_str: # 也要打乱一下,保证随机性 shuffled_dates = list(dates) random.shuffle(shuffled_dates) return shuffled_dates valid_dates = [] try: # 截取前10位以防带有时分秒 s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") # 比较范围 (闭区间) if s_date <= curr_date <= e_date: valid_dates.append(date_str) except ValueError: self._log("Date format error in expected_start_date or expected_end_date. Ignoring filter.") shuffled_dates = list(dates) random.shuffle(shuffled_dates) return shuffled_dates random.shuffle(valid_dates) return valid_dates def _log(self, message): if self.logger: self.logger(f'[GrcPlugin] [{self.group_id}] {message}') def _save_debug_html(self, content: str, prefix: str = "debug"): save_dir = "debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) self._log(f"HTML saved to: {filename}") def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" return proxy_url def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) if self.config.debug: self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}') if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: """ 根据用户的期望范围筛选可用日期 :param dates: API 返回的可用日期列表 (YYYY-MM-DD) :param start_str: 用户期望开始日期 (YYYY-MM-DD) :param end_str: 用户期望结束日期 (YYYY-MM-DD) :return: 符合要求的日期列表 """ # 如果没有设置范围,则不过滤,返回所有日期 if not start_str or not end_str: return dates valid_dates = [] # 截取前10位以防带有时分秒 s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") # 比较范围 (闭区间) if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates