import time import json import random import re import os from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Callable, Optional, Set, Tuple from curl_cffi import requests, const from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError MODERN_BROWSERS: List[str] = [ "chrome", "edge", "safari", "safari_ios", "chrome_android", "firefox", ] class GrcPlugin(IVSPlg): """ https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas 签证预约插件 适配爱尔兰希腊签证 (GR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.session: Optional[requests.Session] = None self.resource_id = '1123832' self.rp_id = None self.token = None self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def keep_alive(self): pass def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } chosen_browser = random.choice(MODERN_BROWSERS) self._log(f"Using browser fingerprint: {chosen_browser}") self.session = requests.Session( proxy=self._get_proxy_url(), impersonate=chosen_browser, curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas" self._perform_request("GET", login_url) headers = { "Referer": login_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded" } data = { "name": self.config.account.username, "password": self.config.account.password, "remember": "K", "cookie_fix": "1", "button": "" } resp = self._perform_request('POST', login_url, headers=headers, data=data) if "Sign out" in resp.text or "Signed in as" in resp.text: if "reached the maximum number" in resp.text or "You cannot create new reservations" in resp.text: self.is_healthy = False self._save_debug_html(resp.text, prefix='login_quota_exceeded') self._log(f"Login failed: Account '{self.config.account.username}' has reached the maximum number of reservations.") raise BizLogicError(message='Login failed: Account has reached the maximum number of reservations.') self.session_create_time = time.time() self._log(f"Session created successfully. (User: {self.config.account.username})") elif "Invalid email or password" in resp.text: self._save_debug_html(resp.text, prefix='login_auth_fail') raise BizLogicError(message='Login failed: Invalid email or password') else: self._save_debug_html(resp.text, prefix='login_unknown_fail') self._log(f"Login check failed. Current URL: {resp.url}") raise BizLogicError(message='Login failed: Unknown response') def _extract_int(self, pattern: str, text: str, default: int = 0) -> int: """辅助正则提取数字的函数""" match = re.search(pattern, text) if match: return int(match.group(1)) return default def _get_daily_bounds_and_breaks(self, open_times: list, bit_prefs: int, date_ts: int, js_day_index: int) -> Tuple[Optional[int], Optional[int], Optional[Tuple[int, int]]]: """完美还原JS逻辑,解析每天的营业起始时间,并提取隐藏的午休时间""" if not open_times: return None, None, None # ========================================================= # 【终极拦截器】:解析 bit_prefs 位掩码! # 最低的 7 位控制着周日(0)到周六(6)的物理营业开关。 # 如果为 0,这天就是雷打不动的休息日,直接返回空! # ========================================================= if bit_prefs > 0: if not (bit_prefs & (1 << js_day_index)): return None, None, None def safe_get(idx): if idx < len(open_times): return open_times[idx] return None # [0-6] 是周日到周六的每天开始时间(分), [7-13] 是结束时间(分) start_min = safe_get(js_day_index) end_min = safe_get(js_day_index + 7) # Null 意味着当天不营业 if start_min is None or end_min is None or start_min >= end_min: return None, None, None # JS 中的 day_base_ts = 严格的当天 00:00:00 (根据绝对时间戳截断) day_base_ts = date_ts - (date_ts % 86400) start_ts = day_base_ts + start_min * 60 end_ts = day_base_ts + end_min * 60 # [14-20] 是午休开始时间, [21-27] 是午休结束时间 break_block = None break_start_min = safe_get(js_day_index + 14) break_end_min = safe_get(js_day_index + 21) if break_start_min is not None and break_end_min is not None: b_start = day_base_ts + break_start_min * 60 b_end = day_base_ts + break_end_min * 60 break_block = (b_start, b_end) return start_ts, end_ts, break_block def _fetch_schedule_data(self, start_dt: datetime, days: int) -> Dict: """发送一次 AJAX 请求,获取指定时间范围内的所有数据""" afrom_str = start_dt.strftime("%Y-%m-%d") ato_dt = start_dt + timedelta(days=days) ato_str = ato_dt.strftime("%Y-%m-%d 00:00") url = f"https://www.supersaas.com/ajax/resource/{self.rp_id}" params = { "token": self.token, "afrom": afrom_str, "ato": ato_str, "ad": "r", "efrom": afrom_str, "eto": ato_dt.strftime("%Y-%m-%d"), } headers = { "Accept": "*/*", "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "X-Requested-With": "XMLHttpRequest" } self._log(f"Fetching data from {afrom_str} to {ato_dt.strftime('%Y-%m-%d')}...") resp = self._perform_request("GET", url, params=params, headers=headers) return resp.json() def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas" headers = { "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas", "Origin": "https://www.supersaas.com", } resp = self._perform_request("GET", url, headers=headers) if 'Log into Visas schedule' in resp.text: self.is_healthy = False raise SessionExpiredOrInvalidError(message='Session expired.') # 1. 提取核心会话参数 self.resource_id = str(self._extract_int(r'resource\[(\d+)\]\s*=', resp.text)) self.rp_id = str(self._extract_int(r'rp_id=(\d+)', resp.text)) self.token = str(self._extract_int(r'token=(\d+)', resp.text)) if not self.rp_id or not self.token or self.resource_id == '0': self._log("Failed to extract rp_id or token from HTML") raise NotFoundError(message='rp_id or token not found') # 2. 提取并还原所有的 JS 限制变量 default_length = self._extract_int(r'default_length\s*=\s*(\d+)', resp.text, 3600) rounding = self._extract_int(r'rounding\s*=\s*(\d+)', resp.text, 3600) buffer_time = self._extract_int(r'buffer\s*=\s*(\d+)', resp.text, 0) add_limit = self._extract_int(r'add_limit\s*=\s*(\d+)', resp.text, 0) early_limit = self._extract_int(r'early_limit\s*=\s*(\d+)', resp.text, 0) early_snap = self._extract_int(r'early_snap\s*=\s*(\d+)', resp.text, 0) bit_prefs = self._extract_int(r'bit_prefs\s*=\s*(\d+)', resp.text, 0) # 3. 提取 open_times open_times = [] ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text) if ot_match: # 巧妙利用 json.loads 处理包含 null 的 JS 数组字符串 open_times_str = f"[{ot_match.group(1)}]" open_times = json.loads(open_times_str) # 4. 提取放号排期 (Season) season_start_ts = 0 season_end_ts = float('inf') season_match = re.search(r'season\s*=\s*\[(\d+),\s*(\d+)\]', resp.text) if season_match: season_start_ts = int(season_match.group(1)) season_end_ts = int(season_match.group(2)) # 5. 提取全局页面屏蔽的假日例外期 (ecache) ecache_blocks = [] ecache_match = re.search(r'ecache\s*=\s*\{data:\s*\[(.*?)\]\}', resp.text) if ecache_match: triplets = re.findall(r'\[(\d+),\s*(\d+),\s*\d+\]', ecache_match.group(1)) for t0, t1 in triplets: ecache_blocks.append((int(t0), int(t1))) scan_start_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) chunk_start_ts_initial = int(scan_start_dt.timestamp()) days_total_scan = 60 chunk_size = 30 valid_slots_map: Dict[datetime.date, List[TimeSlot]] = {} # Chunk 循环 for i in range(0, days_total_scan, chunk_size): chunk_start_dt = scan_start_dt + timedelta(days=i) json_data = self._fetch_schedule_data(chunk_start_dt, chunk_size) # 把已经预定的订单、节假日、以及Gcal双向同步块统一收集为“阻挡物实体墙” all_blocks: List[Tuple[int, int]] = ecache_blocks.copy() for key in ['app', 'exc', 'gcal']: if key in json_data: for item in json_data[key]: all_blocks.append((int(item[0]), int(item[1]))) for day_offset in range(chunk_size): # 这里必须使用纯净的绝对 UNIX 时间进行加法,避免夏令时导致时间漂移 current_day_ts = int(chunk_start_dt.timestamp()) + day_offset * 86400 # 计算出当前天对应的 JS Weekday (0=Sun, 1=Mon ... 6=Sat) js_day_index = int((4 + (current_day_ts // 86400)) % 7) # 这里传入了 bit_prefs,彻底过滤掉伪装成开门的休息日! start_ts, end_ts, break_block = self._get_daily_bounds_and_breaks(open_times, bit_prefs, current_day_ts, js_day_index) if start_ts is None: continue # 每天专属的围墙:加入当天的午休时间 daily_blocks = all_blocks.copy() if break_block: daily_blocks.append(break_block) curr_ts = start_ts current_time_ts = int(time.time()) # --- 完全还原 JS 的 limits 以及 bit_prefs (early_snap) 机制 --- min_bookable_ts = current_time_ts + add_limit + 90 max_bookable_ts = float('inf') if early_limit > 0: limit_ts = current_time_ts + early_limit if early_snap: k = limit_ts % 86400 y = int((4 + (limit_ts // 86400)) % 7) # 这里正是 bit_prefs 的用武之地:检测位掩码 if (bit_prefs & (1 << y)) and open_times[y] is not None and open_times[y] <= (k / 60): limit_ts += (86400 - k) max_bookable_ts = limit_ts + default_length # ================= 核心碰撞与吸附算法 ================= while curr_ts + default_length <= end_ts: # 1. 不能早于排期放号的绝对起始时间 if curr_ts < season_start_ts: curr_ts = season_start_ts continue # 2. 如果撞到了排期尽头,或超过了后台设置的最远可预约天数,当天循环直接结束 if curr_ts >= season_end_ts or curr_ts >= max_bookable_ts: break # 3. 拦截提前量 (比如不允许预约1小时内的票) if curr_ts < min_bookable_ts: curr_ts = min_bookable_ts continue # 4. 【核心机制 1】:完美网格吸附 (Rounding Snapping) # 动态适配 10分、15分、30分、60分的网格,废弃单纯整点逻辑 rem = curr_ts % rounding if rem > 0: curr_ts += (rounding - rem) continue slot_end = curr_ts + default_length # 5. 【核心机制 2】:带 Buffer 的贪婪碰撞检测 overlapping_end = 0 for b_start, b_end in daily_blocks: # 障碍物向前后各膨胀 buffer 的大小 blocked_start = b_start - buffer_time blocked_end = b_end + buffer_time # 区间交集判断逻辑 if curr_ts < blocked_end and slot_end > blocked_start: if blocked_end > overlapping_end: overlapping_end = blocked_end if overlapping_end > 0: # 碰撞触发:指针直接跳跃到带缓冲区的“障碍物最晚结束时间点” curr_ts = overlapping_end # 进入下个 while 循环时,机制 1 会将其吸附回最近的合法网格上! else: # ======================================================== # 完美过检:记录合法 Slot (在此处增加 Finish Time 的提取) # ======================================================== dt_start = datetime.fromtimestamp(curr_ts, tz=timezone.utc) # 计算结束时间戳与时间对象 finish_ts = curr_ts + default_length dt_finish = datetime.fromtimestamp(finish_ts, tz=timezone.utc) payload = { "resource_id": self.resource_id, "timestamp": curr_ts, "datetime": dt_start.strftime("%Y-%m-%d %H:%M:%S"), "finish_timestamp": finish_ts, "finish_datetime": dt_finish.strftime("%Y-%m-%d %H:%M:%S") } time_slot = TimeSlot( time=dt_start.strftime("%H:%M"), label=json.dumps(payload) ) date_key = dt_start.date() if date_key not in valid_slots_map: valid_slots_map[date_key] = [] valid_slots_map[date_key].append(time_slot) # 推进指针,寻找下一个槽位 curr_ts += default_length # === 结果聚合与整理 === if valid_slots_map: res.success = True res.availability_status = AvailabilityStatus.Available sorted_dates = sorted(valid_slots_map.keys()) res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time()) for d in sorted_dates: da = DateAvailability( date=datetime.combine(d, datetime.min.time()), times=valid_slots_map[d] ) res.availability.append(da) self._log(f"Found slots on {len(sorted_dates)} days.") else: self._log("No slots found.") return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: if user_inputs is None: user_inputs = {} res = VSBookResult() res.success = False # 重新登录一次,避免session expired self.create_session() exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') date_map = {d.date.strftime("%Y-%m-%d"): d for d in slot_info.availability} all_dates = list(date_map.keys()) valid_dates = self._filter_dates(all_dates, exp_start, exp_end) if not valid_dates: self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).") return res candidate_slots =[] for date_str in valid_dates: if date_str in date_map: candidate_slots.extend(date_map[date_str].times) random.shuffle(candidate_slots) if not candidate_slots: self._log("No slots found after filtering.") return res self._log(f"Found {len(candidate_slots)} candidate slots. Starting booking attempts...") # [修复点 1]:严谨处理输入参数里的空字符串,防止 dict.get() 返回空字符串触发后端空白报错 first_name = (user_inputs.get('first_name') or '').strip() last_name = (user_inputs.get('last_name') or '').strip() full_name = f"{first_name} {last_name}".strip() or "Traveler" phone_code = (user_inputs.get('phone_country_code') or '353').strip() phone_no = (user_inputs.get('phone') or '088888888').strip() mobile = f"{phone_code}{phone_no}" address = (user_inputs.get('address') or "Dublin, Ireland").strip() passport_no = (user_inputs.get('passport_no') or "P0000000").strip() nationality = (user_inputs.get('nationality') or 'Chinese').strip() visa_reason = (user_inputs.get('visa_reason') or 'tourism').strip() for target_slot in candidate_slots[0:2]: try: slot_data = json.loads(target_slot.label) start_time_str = slot_data.get('datetime') finish_time_str = slot_data.get('finish_datetime') bk_res_id = slot_data.get('resource_id', self.resource_id) start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S") if finish_time_str: finish_dt = datetime.strptime(finish_time_str, "%Y-%m-%d %H:%M:%S") else: finish_dt = start_dt + timedelta(hours=1) # 处理抵离日期确保不为空 arrival_date = (user_inputs.get('arrival_date') or '').strip() if not arrival_date: arrival_date = (start_dt + timedelta(days=15)).strftime("%Y-%m-%d") request_url = f"https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas?view=day&day={start_dt.day}&month={start_dt.month}" fmt_start = f"{start_dt.day}/{start_dt.month}/{start_dt.year} {start_dt.strftime('%H:%M')}" fmt_finish = f"{finish_dt.day}/{finish_dt.month}/{finish_dt.year} {finish_dt.strftime('%H:%M')}" headers = { "Referer": request_url, "Origin": "https://www.supersaas.com", "Content-Type": "application/x-www-form-urlencoded", } # ========================== # Request step 1: 预检查/打开弹窗 # ========================== payload_stp_1 = { "reservation[start_time]": fmt_start, "reservation[finish_time]": fmt_finish, "reservation[full_name]": full_name, "reservation[mobile]": mobile, "reservation[address]": address, "reservation[resource_id]": bk_res_id, "button": "", "reservation[xpos]": "100", "reservation[ypos]": "200" } self._log(f"[Attempt] Try slot {start_time_str} - Req step 1") resp_stp_1 = self._perform_request('POST', request_url, data=payload_stp_1, headers=headers) if "This spot has already been taken" in resp_stp_1.text: self._log(f"Slot {start_time_str} is TAKEN. Trying next...") continue if 'id="reservation_error"' in resp_stp_1.text or 'class="dbox-error"' in resp_stp_1.text: err_match = re.search(r'
  • (.*?)
  • ', resp_stp_1.text) err_reason = err_match.group(1) if err_match else "Unknown error" self._log(f"Slot {start_time_str} unavailable ({err_reason}). Trying next...") continue time.sleep(random.uniform(10, 30)) # ========================== # Request step 2: 提交表单 # ========================== payload_stp_2 = { "form[5]": passport_no, "form[8]": nationality, "form[6]": visa_reason, "form[7]": arrival_date, "form[9]": "", "form_commit": "Submit", "reservation[start_time]": fmt_start, "reservation[finish_time]": fmt_finish, "reservation[full_name]": full_name, "reservation[mobile]": mobile, "reservation[address]": address, "reservation[resource_id]": bk_res_id, "reservation[xpos]": "100", "reservation[ypos]": "200" } self._log(f"[Attempt] Submitting form for {start_time_str} - Req step 2") resp_book_stp_2 = self._perform_request('POST', request_url, data=payload_stp_2, headers=headers) # [修复点 2]:正确判断表单是否提交成功(不能只靠200判断,因为即使有误也会返回200状态码呈现红框) if "Reservation successfully created" in resp_book_stp_2.text: res.success = True res.book_date = start_dt.strftime("%Y-%m-%d") res.book_time = start_dt.strftime("%H:%M") self._log(f"Booking SUCCESS for {res.book_date} at {res.book_time}") return res else: # 获取表单验证的具体失败原因用于日志记录 if 'id="errorExplanation"' in resp_book_stp_2.text or 'class="errorExplanation"' in resp_book_stp_2.text: err_match = re.search(r'
  • (.*?)
  • ', resp_book_stp_2.text) err_reason = err_match.group(1) if err_match else "Form validation error" self._log(f"Submission failed for {start_time_str} ({err_reason}). Trying next...") else: self._log(f"Submission failed for {start_time_str}: Unknown error. Trying next...") if self.config.debug: self._save_debug_html(resp_book_stp_2.text, prefix='book_step2_fail') continue except Exception as e: self._log(f"Exception trying slot {start_time_str}: {e}") continue self._log("All candidate slots failed.") return res def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: if not start_str or not end_str: shuffled_dates = list(dates) random.shuffle(shuffled_dates) return shuffled_dates valid_dates = [] try: s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") if s_date <= curr_date <= e_date: valid_dates.append(date_str) except ValueError: self._log("Date format error in expected_start_date or expected_end_date. Ignoring filter.") shuffled_dates = list(dates) random.shuffle(shuffled_dates) return shuffled_dates random.shuffle(valid_dates) return valid_dates def _log(self, message): if self.logger: self.logger(f'[GrcPlugin] [{self.group_id}] {message}') def _save_debug_html(self, content: str, prefix: str = "debug"): save_dir = "data/debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) self._log(f"HTML saved to: {filename}") def _get_proxy_url(self): proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.proto}://{s.ip}:{s.port}" return proxy_url def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) if self.config.debug: self._log(f'[perform request] Response={resp.text[:200]}...\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}') if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")