import time import json import random import re import os import base64 from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin from curl_cffi import requests, const from bs4 import BeautifulSoup from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, TimeSlot, DateAvailability, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi from toolkit.ocr_engine import DddOcrEngine def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"): # 转换日期到YYYY-MM-DD 固定格式 dt = datetime.strptime(data_str, date_str_format) return dt.strftime("%Y-%m-%d") def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str: """ 将邮箱域名替换为指定域名(默认 gmail-app.com) """ if "@" not in email: raise ValueError(f"Invalid email: {email}") local_part, _ = email.rsplit("@", 1) return f"{local_part}@{new_domain}" class DePlugin(IVSPlg): """ Germany (Visametric) 签证预约插件 适配 Visametric Ireland -> Germany 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.logger = None self.session: Optional[requests.Session] = None # 状态 self.is_healthy = True # 关键上下文变量 (从页面提取) self.base_url = "https://ie-appointment.visametric.com" self.csrf_token = "" self.personal_info_val = "" self.email_val_control = "" # 默认 OCR 服务地址 self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} if self.free_config.get("base_url"): self.base_url = self.free_config["base_url"].rstrip('/') def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): """ 初始化会话:过盾 -> 获取 CSRF -> 识别图片验证码 -> 提交验证码 -> 获取上下文 """ # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) self.ocr_engine = DddOcrEngine() # 2. 访问首页,获取 CSRF 和 Captcha 图片 # Visametric 首页通常有 Cloudflare url_home = f"{self.base_url}/en" # 尝试过盾 self._solve_cloudflare5S_challenge() default_headers = self._get_headers() default_headers.pop("X-Requested-With") resp = self._perform_request('GET', url_home, headers=default_headers) if self.config.debug: self._save_debug_html(resp.text, prefix="VisaMetric_Home_Page") html = resp.text soup = BeautifulSoup(html, 'html.parser') meta = soup.find('meta', {'name': 'csrf-token'}) if not meta: raise NotFoundError(message='Missing csrf-token in html') self.csrf_token = meta.get('content', '') # 提取验证码图片 Base64, 正则匹配: "data:image/png;base64," + "..." match = re.search(r'"data:image/png;base64,"\s*\+\s*"(.*?)"', html) if not match: raise NotFoundError(message="Captcha image not found") captcha_b64 = base64.b64decode(match.group(1)) captcha_code = self.ocr_engine.inference_captcha(captcha_b64) self._log(f"Captcha recognized: {captcha_code}") # 4. 提交验证码 (/appointment-form) # 这一步是为了让服务器验证 Session,并返回包含 personalinfo 的页面 self._submit_captcha(captcha_code) self.session_create_time = time.time() self._log("Session created successfully.") def query(self, apt_type: AppointmentType) -> VSQueryResult: """ 查询可用日期 (/getdate) """ res = VSQueryResult() res.apt_type = apt_type # 构造 Payload (参考 get_slot_day) 这里的 ID 需要根据实际情况配置 consular_id = self.free_config.get("consularid", "1") # 1=Ireland? max_retries = self.free_config.get("slot_query_max_retries", 2) url = f"{self.base_url}/en/getdate" payload = { "consularid": consular_id, "exitid": "1", "servicetypeid": "1", "calendarType": "2", "totalperson": "1" } default_headers = self._get_headers() default_headers['X-CSRF-TOKEN'] = self.csrf_token for attempt in range(1, max_retries + 1): try: resp = self._perform_request('POST', url, data=payload, headers=default_headers) break # ✅ 请求成功,跳出重试循环 except PermissionDeniedError: self._log(f"Getdate blocked (403), attempt {attempt}/{max_retries}") # 最后一次就不再绕盾了 if attempt >= max_retries: raise PermissionDeniedError() self._solve_cloudflare5S_challenge() self._log("Cloudflare bypass success, retrying...") continue # Visametric 返回 JSON: {"getDateEnable": ["15-01-2026", "16-01-2026"]} j = resp.json() dates = j.get("getDateEnable", []) if dates: res.success = True res.availability_status = AvailabilityStatus.Available # Visametric 返回 DD-MM-YYYY → 标准化为 YYYY-MM-DD res.earliest_date = to_yyyymmdd(dates[0], "%d-%m-%Y") res.availability = [ DateAvailability( date=to_yyyymmdd(d, "%d-%m-%Y"), times=[], ) for d in dates ] else: # 查询成功,但没有可用日期 res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult: """ 执行预约:选择日期 -> 选择时间 -> 发邮件 -> 填表 -> 提交 """ res = VSBookResult() # 1. 筛选日期 available_dates = [da.date for da in slot_info.availability] exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') valid_dates = self._filter_dates(available_dates, exp_start, exp_end) if not valid_dates: raise NotFoundError(message="No dates match user constraints") target_date = random.choice(valid_dates) self._log(f"Selected date: {target_date}") # 2. 获取时间 (/senddate) time_slot = self._get_slot_time(target_date) self._log(f"Selected time: {time_slot['time']}") # 3. 触发邮件流程 (Step 1: /jky45fgd) alias_email = get_alias_email(user_inputs.get("email"), new_domain='gmail-app.com') self._send_email_step1(alias_email) # 4. 触发邮件流程 (Step 2: /confirmCodeSendMail) 这一步会发送包含验证码的邮件 根据原代码逻辑: send_email("0") 触发发送 self._send_email_step2("0") # 5. 读取 OTP # Visametric 邮件发送者 recipient = alias_email otp_code = self._read_otp_email(recipient) # 6. 提交验证码并确认 (/personal/appointment/create) book_res_html = self._confirm_appointment(target_date, time_slot, user_inputs, otp_code, alias_email) if "complete all required fields" in book_res_html.lower(): raise BizLogicError(message='Comfirm appointment response ') # 7. 提取结果 match = re.search(r'https:\/\/checkout\.stripe\.com\/c\/pay\/[^\s"]+', book_res_html) res.success = True res.fee_amount = 3000 res.fee_currency = 'EUR' res.book_date = target_date res.book_time = time_slot['time'] if match: res.payment_link = match.group(0) self._log(f"Payment Link Found: {res.payment_link}") return res # --------------------------------------------------------- # 辅助方法 # --------------------------------------------------------- def _log(self, message): if self.logger: self.logger(f'[DePlugin] [{self.group_id}] {message}') def _get_headers(self) -> Dict[str, str]: """基础 Header""" return { "Accept": "*/*", "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8", "Origin": self.base_url, "Referer": f"{self.base_url}/en/appointment-form", # 默认 Referer "X-Requested-With": "XMLHttpRequest" } def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" return proxy_url def _save_debug_html(self, content: str, prefix: str = "debug"): save_dir = "debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) self._log(f"HTML saved to: {filename}") def _submit_captcha(self, code): url = f"{self.base_url}/en/appointment-form" payload = { '_token': self.csrf_token, 'cpJvnsControl': '', 'mailConfirmCode': code } headers = self._get_headers() headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' resp = self._perform_request('POST', url, data=payload, headers=headers) if self.config.debug: self._save_debug_html(resp.text, prefix="VisaMetric_Make_Appointment_Page") # 关键:提交验证码后,返回的 HTML 中包含了后续需要的加密参数 match_pi = re.search(r"personalinfo:\s*'([^']*)'", resp.text) if match_pi: self.personal_info_val = match_pi.group(1) # emailValControl: '...' match_ev = re.search(r"emailValControl:\s*'([^']*)'", resp.text) if match_ev: self.email_val_control = match_ev.group(1) if not self.personal_info_val: raise NotFoundError(message="Personalinfo not found in captcha response") soup = BeautifulSoup(resp.text, 'html.parser') meta = soup.find('meta', {'name': 'csrf-token'}) if not meta: raise NotFoundError(message='Missing csrf-token in html') self.csrf_token = meta.get('content', '') def _get_slot_time(self, date) -> Optional[Dict]: url = f"{self.base_url}/en/senddate" dt_m = datetime.strptime(date, "%Y-%m-%d") converted_date = dt_m.strftime("%d-%m-%Y") payload = { "fulldate": converted_date, "totalperson": "1", "set_new_consular_id": self.free_config.get("consularid", "1"), "set_new_exit_office_id": "1", "calendarType": "2", "set_new_service_type_id": "1", "personalinfo": self.personal_info_val } headers = self._get_headers() headers['X-CSRF-TOKEN'] = self.csrf_token # 这里需要 CSRF headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' resp = self._perform_request('POST', url, data=payload, headers=headers) soup = BeautifulSoup(resp.text, 'html.parser') buttons = soup.find_all('button') slots = [] for btn in buttons: i_tag = btn.find('i') if i_tag: time_val = i_tag.next_sibling.strip() slots.append({ 'time': time_val, 'data_id': btn.get('data-id'), 'data_all': btn.get('data-all') }) if slots: return random.choice(slots) else: raise NotFoundError(message='Not slot time available') def _send_email_step1(self, email): url = f"{self.base_url}/en/jky45fgd" payload = { "emailCheck": email, "personalinfo": self.personal_info_val } headers = self._get_headers() headers['X-CSRF-TOKEN'] = self.csrf_token headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' self._perform_request('POST', url, data=payload, headers=headers) def _send_email_step2(self, code_val): url = f"{self.base_url}/en/confirmCodeSendMail" payload = { "confirmCode": code_val, "emailValControl": self.email_val_control } headers = self._get_headers() headers['X-CSRF-TOKEN'] = self.csrf_token headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' self._perform_request('POST', url, data=payload, headers=headers) def _read_otp_email(self, recipient) -> str: """ 读取 OTP 邮件 """ master_email = "visafly666@gmail.com" sender = 'Visametric - verify at visametric.com' subject_keywords = 'Verification Code' body_keywords = 'Verification code' now_utc = datetime.utcnow() formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S") self._log(f"Waiting for OTP email sent after {formatted_utc_time}...") # 3. 轮询查收 for i in range(12): content_out = VSCloudApi.Instance().fetch_mail_content( master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300 ) if content_out: match = re.search(r'\b\d{6}\b', content_out) if match: otp = match.group(0) self._log(f"OTP code found: {otp}") return otp time.sleep(5) raise NotFoundError(message="OTP email not found (timeout)") def _confirm_appointment(self, date, slot_data, user_inputs, otp, alias_email): url = f"{self.base_url}/en/personal/appointment/create" # 处理日期格式 YYYY-MM-DD def _get_dob(d_str): try: return datetime.strptime(d_str[:10], "%Y-%m-%d") except: return datetime.now() dob = _get_dob(user_inputs.get('birthday', '')) payload = { "_token": self.csrf_token, "country": str(self.free_config.get("consularid", "1")), "visitingcountry": str(self.free_config.get("consularid", "1")), "city": "6", # Dublin? 需配置 "office": "1", "officetype": "1", "totalPerson": "1", "name1": user_inputs.get('first_name', '').upper(), "surname1": user_inputs.get('last_name', '').upper(), "nationality1": "2", # 假设值 "birthday1": str(dob.day), "birthmonth1": str(dob.month), "birthyear1": str(dob.year), "passport1": user_inputs.get('passport_no'), # 原代码 passport_expried 是 DD-MM-YYYY "passportExpirationDate1": datetime.strptime(user_inputs.get('passport_expiry_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"), "email1": alias_email, "phone1": user_inputs.get('phone_no'), "alternativephone1": "", # 其他 person 留空 "name2": "", "surname2": "", "nationality2": "0", "birthday2": "0", "birthmonth2": "0", "birthyear2": "0", "passport2": "", "passportExpirationDate2": "", "email2": alias_email, "phone2": user_inputs.get('phone_no'), "alternativephone2": "", "name3": "", "surname3": "", "nationality3": "0", "birthday3": "0", "birthmonth3": "0", "birthyear3": "0", "passport3": "", "passportExpirationDate3": "", "email3": alias_email, "phone3": user_inputs.get('phone_no'), "alternativephone3": "", "name4": "", "surname4": "", "nationality4": "0", "birthday4": "0", "birthmonth4": "0", "birthyear4": "0", "passport4": "", "passportExpirationDate4": "", "email4": alias_email, "phone4": user_inputs.get('phone_no'), "alternativephone4": "", "mailConfirmCode": otp, "ctval": slot_data['data_id'], "qtallvert": slot_data['data_all'], "oldofficetype": "1", "oldtotalperson": "1", "rePaymentControl": "0", # 关键:View Set "view_set_app_country": "Schengen - Tourism/Family&Friend Visit/Transit Visa/Other Purposes", # 需配置 "view_set_app_office": "Dublin", "view_set_app_service_type": "NORMAL", "cargoactive": "0", "setnewcalendarstatus": "2", "availableDaycontrol": "0", "travelStartDate": datetime.strptime(user_inputs.get('travel_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"), "personalapproveTerms": "1" } headers = self._get_headers() headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' resp = self._perform_request('POST', url, data=payload, headers=headers) return resp.text def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: """ 根据用户的期望范围筛选可用日期 :param dates: API 返回的可用日期列表 (YYYY-MM-DD) :param start_str: 用户期望开始日期 (YYYY-MM-DD) :param end_str: 用户期望结束日期 (YYYY-MM-DD) :return: 符合要求的日期列表 """ # 如果没有设置范围,则不过滤,返回所有日期 if not start_str or not end_str: return dates valid_dates = [] # 截取前10位以防带有时分秒 s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") # 比较范围 (闭区间) if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) if self.config.debug: self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}') if resp.status_code == 200: return resp elif resp.status_code in [401, 419]: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _solve_cloudflare5S_challenge(self): """ 解决 Cloudflare 5s 盾 """ self._log(f"Solving Cloudflare 5s...") website_url = f'{self.base_url}/en' # 1. 格式化代理字符串, 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例) p = self.config.proxy if p.username: proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}" else: proxy_str = f"{p.ip}:{p.port}" # 2. 提交任务 task_id = VSCloudApi.Instance().create_task( command="AntiCloudflareTask", args={ "proxy": proxy_str, "websiteUrl": website_url } ) result_data = VSCloudApi.Instance().get_task_result(task_id, timeout=60) task_result = result_data.get("result", {}) cookies_list = task_result.get('cookies', []) for cookie in cookies_list: if cookie['name'] in ['__cf_bm', 'cf_clearance']: self.session.cookies.set( cookie['name'], cookie['value'], domain=cookie['domain'], path='/' ) self.session.headers['User-Agent'] = task_result.get('userAgent') self._log("Cloudflare 5s challenge solved.")