| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- import time
- import json
- import random
- import re
- import os
- from datetime import datetime, timezone, timedelta
- from typing import List, Dict, Any, Callable, Optional
- from curl_cffi import requests, const
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- class GrcPlugin(IVSPlg):
- """
- https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas
- 签证预约插件
- 适配爱尔兰希腊签证 (GR) 流程
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- self.logger = None
- self.session: Optional[requests.Session] = None
- self.resource_id = '1123832'
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.session is None:
- return False
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life * 60:
- self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session")
- return False
- return True
- def create_session(self):
- # 1. 初始化 Session
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: self.config.debug,
- }
-
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
- login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas"
- self._perform_request("GET", login_url)
- headers = {
- "Referer": login_url,
- "Origin": "https://www.supersaas.com",
- "Content-Type": "application/x-www-form-urlencoded"
- }
-
- data = {
- "name": self.config.account.username,
- "password": self.config.account.password,
- "remember": "K",
- "cookie_fix": "1",
- "button": ""
- }
-
- resp = self._perform_request('POST', login_url, headers=headers, data=data)
- if "Sign out" in resp.text or "Signed in as" in resp.text:
- self.session_create_time = time.time()
- self._log(f"Session created successfully. (User: {self.config.account.username})")
-
- # 如果登录失败,SuperSaaS 通常会留在当前页面并显示错误信息
- elif "Invalid email or password" in resp.text:
- self._save_debug_html(resp.text, prefix='login_auth_fail')
- raise BizLogicError(message='Login failed: Invalid email or password')
-
- else:
- # 其他未知错误
- self._save_debug_html(resp.text, prefix='login_unknown_fail')
- # 打印 URL 辅助调试,看是否跳转了
- self._log(f"Login check failed. Current URL: {resp.url}")
- raise BizLogicError(message='Login failed: Unknown response')
-
- def _get_daily_schedule(self, open_times, date_obj):
- """根据 open_times 获取当天的开始和结束分钟数"""
- if not open_times:
- return None, None
- weekday_py = date_obj.weekday()
- if weekday_py >= 4:
- return None, None
- js_day_index = (date_obj.weekday() + 1) % 7
- start_min = open_times[js_day_index]
- end_min = open_times[js_day_index + 7]
- return start_min, end_min
-
- def _is_blocked_by_ecache(self, ecache_blocked, timestamp):
- """检查某个时间点是否在临时关闭范围内 (如节假日)"""
- for block in ecache_blocked:
- if block[0] <= timestamp < block[1]:
- return True
- return False
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
-
- url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas"
- headers = {
- "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas",
- "Origin": "https://www.supersaas.com",
- }
- resp = self._perform_request("GET", url, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix='Grc_Query_Slot_Page')
- if 'Log into Visas schedule' in resp.text:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError(message='Session expired.')
-
- res_id_match = re.search(r'resource\[(\d+)\]\s*=', resp.text)
- if res_id_match:
- self.resource_id = res_id_match.group(1)
- default_length = None
- len_match = re.search(r'default_length\s*=\s*(\d+)', resp.text)
- if len_match:
- default_length = int(len_match.group(1))
-
- # 提取每日营业时间 (open_times)
- open_times = None
- ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text)
- if ot_match:
- open_times = [int(x) for x in ot_match.group(1).split(',')]
-
- # 提取临时关闭/休息日 (ecache)
- ecache_blocked = None
- ec_match = re.search(r'var ecache\s*=\s*(\{.*?\})', resp.text)
- if ec_match:
- ec_data_str = ec_match.group(1)
- data_match = re.search(r'data:\s*(\[\[.*?\]\])', ec_data_str)
- if data_match:
- ecache_blocked = json.loads(data_match.group(1))
-
- # 提取已预约数据 (app)
- booked_timestamps = None
- app_match = re.search(r'var app\s*=\s*(\[\[.*?\]\])', resp.text)
- if app_match:
- app_data = json.loads(app_match.group(1))
- booked_timestamps = set(item[0] for item in app_data)
-
- # 提取 season (非常重要:学期/季度限制)
- season_range = None
- season_match = re.search(r'season\s*=\s*\[(\d+),(\d+)\]', resp.text)
- if season_match:
- season_range = [int(season_match.group(1)), int(season_match.group(2))]
- print(f"[*] 限制范围 (Season): 截止到 {datetime.fromtimestamp(season_range[1], timezone.utc)}")
-
- # 确定扫描起点
- cursor_match = re.search(r'Date\.UTC\((\d+),(\d+),(\d+),(\d+)\)', resp.text)
- if cursor_match:
- y, m, d, h = map(int, cursor_match.groups())
- start_date = datetime(y, m + 1, d, h, tzinfo=timezone.utc)
- else:
- start_date = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0)
- print(f"[*] 分析配置: 默认时长 {default_length/60} 分钟")
- days_to_scan = 5 * 7
- valid_slots_map: dict[datetime.date, list[TimeSlot]] = {}
- for day_offset in range(days_to_scan):
- current_day = start_date + timedelta(days=day_offset)
-
- start_min, end_min = self._get_daily_schedule(open_times, current_day)
-
- # 如果开始=结束 (比如都是 600),或者没有定义,说明当天不营业
- if start_min is None or start_min >= end_min:
- continue
-
- # 生成当天的所有 Slot, 将分钟转换为当天的具体时间 start_min 630 -> 10:30
- current_slot_min = start_min
- while current_slot_min + (default_length / 60) <= end_min:
- # 计算 Slot 的具体时间对象
- slot_hour = current_slot_min // 60
- slot_minute = current_slot_min % 60
-
- slot_dt = current_day.replace(hour=int(slot_hour), minute=int(slot_minute), second=0, microsecond=0)
- slot_ts = int(slot_dt.timestamp())
-
- # 下一个 slot 开始时间
- current_slot_min += (default_length / 60)
- # 检查是否过期
- if slot_ts < time.time():
- continue
-
- # Slot 时间 必须在 Season 范围内
- if slot_ts >= season_range[1]:
- # print(f" [Skip] {slot_dt} 超出 Season 范围")
- continue
- # 检查是否在临时关闭列表 (ecache) 中
- if self._is_blocked_by_ecache(ecache_blocked, slot_ts + 1):
- # print(f" [Skip] {slot_dt} 被 ecache (节假日/关闭) 屏蔽")
- continue
- # 检查是否已被预约 (在 app 数组中)
- if slot_ts in booked_timestamps:
- # print(f" [Skip] {slot_dt} 已被预约")
- continue
-
- booking_payload = {
- "timestamp": slot_ts,
- "datetime": slot_dt.strftime("%Y-%m-%d %H:%M:%S")
- }
-
- time_slot = TimeSlot(
- time=slot_dt.strftime("%H:%M"),
- label=json.dumps(booking_payload) # 序列化存入 label
- )
-
- date_key = slot_dt.date()
- if date_key not in valid_slots_map:
- valid_slots_map[date_key] = []
- valid_slots_map[date_key].append(time_slot)
-
- if valid_slots_map:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
-
- # 按日期排序
- sorted_dates = sorted(valid_slots_map.keys())
- res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time())
-
- res.availability = []
- for d in sorted_dates:
- res.availability.append(DateAvailability(
- date=datetime.combine(d, datetime.min.time()),
- times=valid_slots_map[d]
- ))
-
- self._log(f"Found availability on {len(sorted_dates)} days.")
- else:
- self._log("No available slots found.")
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- res = VSBookResult()
- res.success = False
- # 1. 准备日期筛选参数
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
-
- # 将 Availability 转换为 { "YYYY-MM-DD": DateAvailabilityObj } 的映射,方便查找
- date_map = {}
- available_date_strs = []
- for date_avail in slot_info.availability:
- # datetime 转 string (YYYY-MM-DD)
- d_str = date_avail.date.strftime("%Y-%m-%d")
- date_map[d_str] = date_avail
- available_date_strs.append(d_str)
- # 2. 筛选符合要求的日期
- # _filter_dates 内部已经进行了 random.shuffle,所以返回列表的第一个即为随机选中的有效日期
- valid_dates = self._filter_dates(available_date_strs, exp_start, exp_end)
-
- if not valid_dates:
- self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).")
- return res
- # 3. 选择具体的 Slot
- target_date_str = valid_dates[0]
- target_day_data = date_map[target_date_str]
-
- if not target_day_data.times:
- res.message = f"Date {target_date_str} has no time slots."
- return res
- # 这里简单策略:选择该日期的第一个可用时间点
- # 如果需要随机时间,可以使用 random.choice(target_day_data.times)
- target_slot = target_day_data.times[0]
- # 4. 解析 Slot Label 数据
- # label 中存储了 {"timestamp": 123456, "datetime": "2026-02-06 10:00:00", ...}
- try:
- slot_data = json.loads(target_slot.label)
- start_time_str = slot_data.get('datetime')
- except Exception as e:
- self._log(f"Failed to parse slot label: {e}")
- return res
- # 5. 准备预定请求数据
- url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas"
-
- # 计算 finish_time (SuperSaaS通常需要 finish_time,默认时长1小时)
- start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
- finish_dt = start_dt + timedelta(hours=1)
- finish_time_str = finish_dt.strftime("%Y-%m-%d %H:%M:%S")
- # 映射用户信息 (假设 self.config.profile 包含这些字段)
- # 根据之前的 HTML 分析:
- # field_1_r -> Passport number
- # field_2_r -> Reason for Visa
-
- first_name = user_inputs.get('first_name', '')
- last_name = user_inputs.get('last_name', '')
- phone_country_code = user_inputs.get('phone_country_code', '353')
- phone_no = user_inputs.get('phone_no', '088888888')
- address = user_inputs.get('address', "Dublin, Ireland")
- passport_no = user_inputs.get('passport_no', "")
- payload = {
- "reservation[start_time]": start_time_str,
- "reservation[finish_time]": finish_time_str,
- "reservation[full_name]": f"{first_name} {last_name}",
- "reservation[mobile]": f'+{phone_country_code}{phone_no}',
- "reservation[address]": address,
- "reservation[description]": "",
-
- # 自定义必填字段
- "reservation[field_1_r]": passport_no,
- "reservation[field_2_r]": "Tourism",
-
- # 系统字段
- "reservation[resource_id]": self.resource_id,
- "reservation[xpos]": "",
- "reservation[ypos]": "",
- "button": ""
- }
- headers = {
- "Referer": url,
- "Origin": "https://www.supersaas.com",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- self._log(f"Attempting to book slot: {start_time_str}")
- resp = self._perform_request('POST', url, data=payload, headers=headers)
-
- res.success = True
- res.book_date = start_dt.strftime("%Y-%m-%d") # 格式: YYYY-mm-dd
- res.book_time = start_dt.strftime("%H:%M") # 格式: hh:mm
- self._log(f"Booking successful for {res.book_date} at {res.book_time}")
- return res
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- """
- 根据用户的期望范围筛选可用日期
-
- :param dates: API 返回的可用日期列表 (YYYY-MM-DD)
- :param start_str: 用户期望开始日期 (YYYY-MM-DD)
- :param end_str: 用户期望结束日期 (YYYY-MM-DD)
- :return: 符合要求的日期列表
- """
- # 如果没有设置范围,则不过滤,返回所有日期
- if not start_str or not end_str:
- # 也要打乱一下,保证随机性
- shuffled_dates = list(dates)
- random.shuffle(shuffled_dates)
- return shuffled_dates
-
- valid_dates = []
- try:
- # 截取前10位以防带有时分秒
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- # 比较范围 (闭区间)
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- except ValueError:
- self._log("Date format error in expected_start_date or expected_end_date. Ignoring filter.")
- shuffled_dates = list(dates)
- random.shuffle(shuffled_dates)
- return shuffled_dates
- random.shuffle(valid_dates)
- return valid_dates
- def _log(self, message):
- if self.logger:
- self.logger(f'[GrcPlugin] [{self.group_id}] {message}')
-
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- save_dir = "debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{timestamp}.html"
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
- self._log(f"HTML saved to: {filename}")
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- if self.config.debug:
- self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}')
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- """
- 根据用户的期望范围筛选可用日期
-
- :param dates: API 返回的可用日期列表 (YYYY-MM-DD)
- :param start_str: 用户期望开始日期 (YYYY-MM-DD)
- :param end_str: 用户期望结束日期 (YYYY-MM-DD)
- :return: 符合要求的日期列表
- """
- # 如果没有设置范围,则不过滤,返回所有日期
- if not start_str or not end_str:
- return dates
-
- valid_dates = []
- # 截取前10位以防带有时分秒
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- # 比较范围 (闭区间)
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
|