|
|
@@ -0,0 +1,527 @@
|
|
|
+import time
|
|
|
+import json
|
|
|
+import random
|
|
|
+import re
|
|
|
+import os
|
|
|
+import uuid
|
|
|
+import shutil
|
|
|
+import base64
|
|
|
+import socket
|
|
|
+from datetime import datetime
|
|
|
+from typing import List, Dict, Optional, Any, Callable
|
|
|
+from urllib.parse import urljoin, urlparse, urlencode
|
|
|
+
|
|
|
+# DrissionPage 核心
|
|
|
+from DrissionPage import ChromiumPage, ChromiumOptions
|
|
|
+
|
|
|
+from vs_plg import IVSPlg
|
|
|
+from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
|
|
|
+from toolkit.vs_cloud_api import VSCloudApi
|
|
|
+from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
|
|
|
+from toolkit.proxy_tunnel import ProxyTunnel
|
|
|
+from toolkit.ocr_engine import DddOcrEngine
|
|
|
+
|
|
|
+
|
|
|
+class BrowserResponse:
|
|
|
+ def __init__(self, result_dict):
|
|
|
+ result_dict = result_dict or {}
|
|
|
+ self.status_code = result_dict.get('status', 0)
|
|
|
+ self.text = result_dict.get('body', '')
|
|
|
+ self.headers = result_dict.get('headers', {})
|
|
|
+ self.url = result_dict.get('url', '')
|
|
|
+ self._json = None
|
|
|
+ def json(self):
|
|
|
+ if self._json is None:
|
|
|
+ if not self.text: return {}
|
|
|
+ try: self._json = json.loads(self.text)
|
|
|
+ except: self._json = {}
|
|
|
+ return self._json
|
|
|
+
|
|
|
+def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
|
|
|
+ dt = datetime.strptime(data_str, date_str_format)
|
|
|
+ return dt.strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
|
|
|
+ if "@" not in email: raise ValueError(f"Invalid email: {email}")
|
|
|
+ local_part, _ = email.rsplit("@", 1)
|
|
|
+ return f"{local_part}@{new_domain}"
|
|
|
+
|
|
|
+class DePlugin2(IVSPlg):
|
|
|
+ """
|
|
|
+ Germany (Visametric) 签证预约插件 (Browser + Tunnel Mode)
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, group_id: str):
|
|
|
+ self.group_id = group_id
|
|
|
+ self.config: Optional[VSPlgConfig] = None
|
|
|
+ self.free_config: Dict[str, Any] = {}
|
|
|
+ self.logger = None
|
|
|
+
|
|
|
+ # 浏览器实例
|
|
|
+ self.page: Optional[ChromiumPage] = None
|
|
|
+
|
|
|
+ # 资源隔离
|
|
|
+ self.instance_id = uuid.uuid4().hex[:8]
|
|
|
+ self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
|
|
|
+ self.user_data_path = os.path.join(self.root_workspace, "user_data")
|
|
|
+
|
|
|
+ if not os.path.exists(self.root_workspace):
|
|
|
+ os.makedirs(self.root_workspace)
|
|
|
+
|
|
|
+ self.tunnel = None # 代理隧道
|
|
|
+ self.is_healthy = True
|
|
|
+ self.session_create_time: float = 0
|
|
|
+
|
|
|
+ # 字符识别引擎
|
|
|
+ self.ocr_engine: Optional[DddOcrEngine] = None
|
|
|
+
|
|
|
+ # 业务状态
|
|
|
+ self.base_url = "https://ie-appointment.visametric.com"
|
|
|
+ self.csrf_token = ""
|
|
|
+ self.personal_info_val = ""
|
|
|
+ self.email_val_control = ""
|
|
|
+
|
|
|
+ def get_group_id(self) -> str:
|
|
|
+ return self.group_id
|
|
|
+
|
|
|
+ def set_log(self, logger: Callable[[str], None]) -> None:
|
|
|
+ self.logger = logger
|
|
|
+
|
|
|
+ def _log(self, message):
|
|
|
+ if self.logger:
|
|
|
+ self.logger(f'[DePlugin] [{self.group_id}] {message}')
|
|
|
+ else:
|
|
|
+ print(f'[DePlugin] [{self.group_id}] {message}')
|
|
|
+
|
|
|
+ def set_config(self, config: VSPlgConfig):
|
|
|
+ self.config = config
|
|
|
+ self.free_config = config.free_config or {}
|
|
|
+ if self.free_config.get("base_url"):
|
|
|
+ self.base_url = self.free_config["base_url"].rstrip('/')
|
|
|
+
|
|
|
+ def health_check(self) -> bool:
|
|
|
+ if not self.is_healthy:
|
|
|
+ return False
|
|
|
+ if not self.page:
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ if not self.page.run_js("return 1;"):
|
|
|
+ return False
|
|
|
+ except:
|
|
|
+ return False
|
|
|
+ if self.config.session_max_life > 0:
|
|
|
+ if time.time() - self.session_create_time > self.config.session_max_life * 60:
|
|
|
+ self._log("Session expired.")
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ def create_session(self):
|
|
|
+ """
|
|
|
+ 创建会话:启动浏览器 -> 代理隧道 -> 过盾 -> 提取 Captcha -> 本地识别 -> 提交 -> 获取 Context
|
|
|
+ """
|
|
|
+ self._log(f"Initializing Session (ID: {self.instance_id})...")
|
|
|
+ self.ocr_engine = DddOcrEngine()
|
|
|
+ co = ChromiumOptions()
|
|
|
+ # 端口分配 (Docker 适配)
|
|
|
+ def get_free_port():
|
|
|
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
|
+ s.bind(('', 0)); return s.getsockname()[1]
|
|
|
+ co.set_local_port(get_free_port())
|
|
|
+
|
|
|
+ # 路径与隔离
|
|
|
+ co.set_user_data_path(self.user_data_path)
|
|
|
+ chrome_path = os.getenv("CHROME_BIN")
|
|
|
+ if chrome_path and os.path.exists(chrome_path):
|
|
|
+ co.set_paths(browser_path=chrome_path)
|
|
|
+
|
|
|
+ # 代理隧道
|
|
|
+ if self.config.proxy and self.config.proxy.ip:
|
|
|
+ p = self.config.proxy
|
|
|
+ if p.username and p.password:
|
|
|
+ self._log(f"Starting Tunnel for {p.ip}...")
|
|
|
+ self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
|
|
|
+ local_proxy = self.tunnel.start()
|
|
|
+ self._log(f"Tunnel started at {local_proxy}")
|
|
|
+ co.set_argument(f'--proxy-server={local_proxy}')
|
|
|
+ else:
|
|
|
+ proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
|
|
|
+ co.set_argument(f'--proxy-server={proxy_str}')
|
|
|
+ else:
|
|
|
+ self._log("[WARN] No proxy configured!")
|
|
|
+
|
|
|
+ # Docker 核心参数
|
|
|
+ co.headless(False)
|
|
|
+ co.set_argument('--no-sandbox')
|
|
|
+ co.set_argument('--disable-gpu')
|
|
|
+ co.set_argument('--disable-dev-shm-usage')
|
|
|
+ co.set_argument('--window-size=1920,1080')
|
|
|
+ co.set_argument('--disable-blink-features=AutomationControlled')
|
|
|
+ co.set_argument('--ignore-certificate-errors')
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.page = ChromiumPage(co)
|
|
|
+
|
|
|
+ # 1. 访问首页
|
|
|
+ url_home = f"{self.base_url}/en"
|
|
|
+ self._log(f"Navigating to {url_home}")
|
|
|
+ self.page.get(url_home)
|
|
|
+
|
|
|
+ # 2. Cloudflare 过盾
|
|
|
+ cf = CloudflareBypasser(self.page, log=self.config.debug)
|
|
|
+ if not cf.bypass(max_retry=15):
|
|
|
+ if "access denied" in self.page.title.lower():
|
|
|
+ raise BizLogicError("Cloudflare Access Denied")
|
|
|
+ raise BizLogicError("Cloudflare bypass timeout")
|
|
|
+
|
|
|
+ # 3. 提取 CSRF 和 验证码
|
|
|
+ # 等待页面加载
|
|
|
+ meta_ele = self.page.ele('xpath://meta[@name="csrf-token"]', timeout=30)
|
|
|
+
|
|
|
+ if not meta_ele:
|
|
|
+ # 截图调试,看看是不是还在 Cloudflare 或者加载失败
|
|
|
+ self.page.get_screenshot(path='csrf_not_found.jpg')
|
|
|
+ raise NotFoundError("CSRF Token meta tag not found (Page load failed?)")
|
|
|
+
|
|
|
+ self.csrf_token = meta_ele.attr('content')
|
|
|
+
|
|
|
+ # 提取验证码图片 (Visametric Base64)
|
|
|
+ html = self.page.html
|
|
|
+ match = re.search(r'"data:image/png;base64,"\s*\+\s*"(.*?)"', html)
|
|
|
+ if not match:
|
|
|
+ # 尝试直接找 img
|
|
|
+ try:
|
|
|
+ img_ele = self.page.ele('xpath://img[contains(@src, "data:image")]')
|
|
|
+ if img_ele:
|
|
|
+ b64_src = img_ele.attr('src')
|
|
|
+ captcha_b64 = b64_src.split(',')[1]
|
|
|
+ else:
|
|
|
+ raise NotFoundError("Captcha image not found")
|
|
|
+ except:
|
|
|
+ raise NotFoundError("Captcha image not found (Regex failed)")
|
|
|
+ else:
|
|
|
+ captcha_b64 = match.group(1)
|
|
|
+ image_bytes = base64.b64decode(captcha_b64)
|
|
|
+ # 4. 识别验证码 (本地 OCR 服务)
|
|
|
+ captcha_code = self.ocr_engine.inference_captcha(image_bytes)
|
|
|
+
|
|
|
+ # 5. 提交验证码 (获取 PersonalInfo)
|
|
|
+ self._submit_captcha(captcha_code)
|
|
|
+
|
|
|
+ self.session_create_time = time.time()
|
|
|
+ self._log("Session created successfully.")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self._log(f"Session Create Failed: {e}")
|
|
|
+ self.cleanup()
|
|
|
+ raise e
|
|
|
+
|
|
|
+ def _submit_captcha(self, code):
|
|
|
+ """
|
|
|
+ 提交验证码,获取 personalinfo 和 emailValControl
|
|
|
+ """
|
|
|
+ url = f"{self.base_url}/en/appointment-form"
|
|
|
+ payload = {
|
|
|
+ '_token': self.csrf_token,
|
|
|
+ 'cpJvnsControl': '',
|
|
|
+ 'mailConfirmCode': code
|
|
|
+ }
|
|
|
+
|
|
|
+ # 使用 Fetch 提交 (Form-UrlEncoded)
|
|
|
+ resp = self._perform_request('POST', url, data=payload, headers={
|
|
|
+ 'X-Requested-With': 'XMLHttpRequest'
|
|
|
+ })
|
|
|
+
|
|
|
+ # 解析返回的 HTML 片段
|
|
|
+ html = resp.text
|
|
|
+
|
|
|
+ # 提取 personalinfo
|
|
|
+ match_pi = re.search(r"personalinfo:\s*'([^']*)'", html)
|
|
|
+ if match_pi: self.personal_info_val = match_pi.group(1)
|
|
|
+
|
|
|
+ # 提取 emailValControl
|
|
|
+ match_ev = re.search(r"emailValControl:\s*'([^']*)'", html)
|
|
|
+ if match_ev: self.email_val_control = match_ev.group(1)
|
|
|
+
|
|
|
+ if not self.personal_info_val:
|
|
|
+ raise NotFoundError(message="Personalinfo not found in captcha response")
|
|
|
+
|
|
|
+ # 更新 CSRF (如果返回了新的)
|
|
|
+ m = re.search(r'name="csrf-token" content="([^"]+)"', html)
|
|
|
+ if m: self.csrf_token = m.group(1)
|
|
|
+
|
|
|
+ def query(self) -> VSQueryResult:
|
|
|
+ res = VSQueryResult()
|
|
|
+ res.success = False
|
|
|
+
|
|
|
+ consular_id = self.free_config.get("consularid", "1")
|
|
|
+ url = f"{self.base_url}/en/getdate"
|
|
|
+ payload = {
|
|
|
+ "consularid": consular_id,
|
|
|
+ "exitid": "1",
|
|
|
+ "servicetypeid": "1",
|
|
|
+ "calendarType": "2",
|
|
|
+ "totalperson": "1"
|
|
|
+ }
|
|
|
+
|
|
|
+ headers = {
|
|
|
+ 'X-CSRF-TOKEN': self.csrf_token,
|
|
|
+ 'X-Requested-With': 'XMLHttpRequest'
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ resp = self._perform_request('POST', url, data=payload, headers=headers, retry_count=1)
|
|
|
+ except Exception as e:
|
|
|
+ self._log(f"Query Error: {e}")
|
|
|
+ raise e
|
|
|
+
|
|
|
+ j = resp.json()
|
|
|
+ dates = j.get("getDateEnable", [])
|
|
|
+
|
|
|
+ res.city = self.free_config.get('city', '')
|
|
|
+ res.country = self.free_config.get('country', '')
|
|
|
+ res.visa_type = self.free_config.get('visa_type', '')
|
|
|
+ res.routing_key = self.free_config.get('routing_key', '')
|
|
|
+
|
|
|
+ if dates:
|
|
|
+ res.success = True
|
|
|
+ res.availability_status = AvailabilityStatus.Available
|
|
|
+ res.earliest_date = to_yyyymmdd(dates[0], "%d-%m-%Y")
|
|
|
+ res.availability = [
|
|
|
+ DateAvailability(date=to_yyyymmdd(d, "%d-%m-%Y"), times=[])
|
|
|
+ for d in dates
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ res.availability_status = AvailabilityStatus.NoneAvailable
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+ def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
|
|
|
+ res = VSBookResult()
|
|
|
+ available_dates = [da.date for da in slot_info.availability]
|
|
|
+ exp_start = user_inputs.get('expected_start_date', '')
|
|
|
+ exp_end = user_inputs.get('expected_end_date', '')
|
|
|
+
|
|
|
+ valid_dates = self._filter_dates(available_dates, exp_start, exp_end)
|
|
|
+ if not valid_dates:
|
|
|
+ raise NotFoundError("No dates match constraints")
|
|
|
+
|
|
|
+ target_date = random.choice(valid_dates)
|
|
|
+ self._log(f"Selected date: {target_date}")
|
|
|
+
|
|
|
+ # 1. 获取时间 Slot
|
|
|
+ time_slot = self._get_slot_time(target_date)
|
|
|
+
|
|
|
+ # 2. 发送邮件流程
|
|
|
+ alias_email = get_alias_email(user_inputs.get("email"), new_domain='gmail-app.com')
|
|
|
+ self._send_email_step1(alias_email)
|
|
|
+ self._send_email_step2("0")
|
|
|
+
|
|
|
+ # 3. 读取 OTP
|
|
|
+ otp_code = self._read_otp_email(alias_email)
|
|
|
+
|
|
|
+ # 4. 提交确认
|
|
|
+ book_res_html = self._confirm_appointment(target_date, time_slot, user_inputs, otp_code, alias_email)
|
|
|
+
|
|
|
+ if "complete all required fields" in book_res_html.lower():
|
|
|
+ raise BizLogicError("Incomplete fields response")
|
|
|
+
|
|
|
+ match = re.search(r'https:\/\/checkout\.stripe\.com\/c\/pay\/[^\s"]+', book_res_html)
|
|
|
+
|
|
|
+ res.success = True
|
|
|
+ res.fee_amount = 3000
|
|
|
+ res.fee_currency = 'EUR'
|
|
|
+ res.book_date = target_date
|
|
|
+ res.book_time = time_slot['time']
|
|
|
+
|
|
|
+ if match:
|
|
|
+ res.payment_link = match.group(0)
|
|
|
+ self._log(f"Payment Link: {res.payment_link}")
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+ # ---------------------------------------------------------
|
|
|
+ # 辅助方法
|
|
|
+ # ---------------------------------------------------------
|
|
|
+
|
|
|
+ def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
|
|
|
+ if not self.page:
|
|
|
+ raise BizLogicError("Browser not init")
|
|
|
+
|
|
|
+ req_url = url
|
|
|
+ if params:
|
|
|
+ sep = '&' if '?' in req_url else '?'
|
|
|
+ req_url += sep + urlencode(params)
|
|
|
+
|
|
|
+ fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" }
|
|
|
+
|
|
|
+ if json_data:
|
|
|
+ fetch_opts['body'] = json.dumps(json_data)
|
|
|
+ fetch_opts['headers']['Content-Type'] = 'application/json'
|
|
|
+ elif data:
|
|
|
+ if isinstance(data, dict):
|
|
|
+ fetch_opts['body'] = urlencode(data)
|
|
|
+ fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
|
|
|
+ else:
|
|
|
+ fetch_opts['body'] = data
|
|
|
+
|
|
|
+ js = f"""
|
|
|
+ return fetch("{req_url}", {json.dumps(fetch_opts)})
|
|
|
+ .then(async r => {{
|
|
|
+ const h = {{}}; r.headers.forEach((v, k) => h[k] = v);
|
|
|
+ return {{ status: r.status, body: await r.text(), headers: h, url: r.url }};
|
|
|
+ }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
|
|
|
+ """
|
|
|
+
|
|
|
+ resp = BrowserResponse(self.page.run_js(js, timeout=60))
|
|
|
+
|
|
|
+ if resp.status_code == 200:
|
|
|
+ return resp
|
|
|
+ elif resp.status_code == 403:
|
|
|
+ if "Just a moment" in resp.text and retry_count < 2:
|
|
|
+ self._log("Cloudflare 403. Refreshing...")
|
|
|
+ if self._refresh_firewall_session():
|
|
|
+ return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
|
|
|
+ raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
|
|
|
+ elif resp.status_code == 429:
|
|
|
+ self.is_healthy = False
|
|
|
+ raise RateLimiteddError()
|
|
|
+ elif resp.status_code in [401, 419]:
|
|
|
+ self.is_healthy = False
|
|
|
+ raise SessionExpiredOrInvalidError()
|
|
|
+ else:
|
|
|
+ raise BizLogicError(f"HTTP {resp.status_code}: {resp.text[:100]}")
|
|
|
+
|
|
|
+ def _refresh_firewall_session(self):
|
|
|
+ try:
|
|
|
+ self.page.refresh()
|
|
|
+ cf = CloudflareBypasser(self.page, log=self.config.debug)
|
|
|
+ return cf.bypass(max_retry=10)
|
|
|
+ except: return False
|
|
|
+
|
|
|
+ def _get_slot_time(self, date) -> Dict:
|
|
|
+ url = f"{self.base_url}/en/senddate"
|
|
|
+ dt_m = datetime.strptime(date, "%Y-%m-%d")
|
|
|
+ converted_date = dt_m.strftime("%d-%m-%Y")
|
|
|
+ payload = {
|
|
|
+ "fulldate": converted_date,
|
|
|
+ "totalperson": "1",
|
|
|
+ "set_new_consular_id": self.free_config.get("consularid", "1"),
|
|
|
+ "set_new_exit_office_id": "1",
|
|
|
+ "calendarType": "2",
|
|
|
+ "set_new_service_type_id": "1",
|
|
|
+ "personalinfo": self.personal_info_val
|
|
|
+ }
|
|
|
+ headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
|
|
|
+ resp = self._perform_request('POST', url, data=payload, headers=headers)
|
|
|
+
|
|
|
+ # 使用 Regex 提取 Slot
|
|
|
+ times = []
|
|
|
+ # pattern: data-id="123" ... <i>09:00</i>
|
|
|
+ for m in re.finditer(r'data-id="([^"]+)"[^>]*data-all="([^"]+)"[^>]*>.*?<i>(.*?)</i>', resp.text, re.DOTALL):
|
|
|
+ times.append({'data_id': m.group(1), 'data_all': m.group(2), 'time': m.group(3).strip()})
|
|
|
+
|
|
|
+ if not times: raise NotFoundError("No time slots")
|
|
|
+ return random.choice(times)
|
|
|
+
|
|
|
+ def _send_email_step1(self, email):
|
|
|
+ url = f"{self.base_url}/en/jky45fgd"
|
|
|
+ payload = { "emailCheck": email, "personalinfo": self.personal_info_val }
|
|
|
+ headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
|
|
|
+ self._perform_request('POST', url, data=payload, headers=headers)
|
|
|
+
|
|
|
+ def _send_email_step2(self, code_val):
|
|
|
+ url = f"{self.base_url}/en/confirmCodeSendMail"
|
|
|
+ payload = { "confirmCode": code_val, "emailValControl": self.email_val_control }
|
|
|
+ headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
|
|
|
+ self._perform_request('POST', url, data=payload, headers=headers)
|
|
|
+
|
|
|
+ def _read_otp_email(self, recipient) -> str:
|
|
|
+ master_email = "visafly666@gmail.com"
|
|
|
+ sender = 'Visametric - verify at visametric.com'
|
|
|
+ now_utc = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ for i in range(12):
|
|
|
+ c = VSCloudApi.Instance().fetch_mail_content(master_email, sender, recipient, 'Verification Code', 'Verification code', now_utc, 300)
|
|
|
+ if c:
|
|
|
+ m = re.search(r'\b\d{6}\b', c)
|
|
|
+ if m: return m.group(0)
|
|
|
+ time.sleep(5)
|
|
|
+ raise NotFoundError("OTP timeout")
|
|
|
+
|
|
|
+ def _confirm_appointment(self, date, slot_data, user_inputs, otp, alias_email):
|
|
|
+ url = f"{self.base_url}/en/personal/appointment/create"
|
|
|
+ def _get_dob(d):
|
|
|
+ try: return datetime.strptime(d[:10], "%Y-%m-%d")
|
|
|
+ except: return datetime.now()
|
|
|
+ dob = _get_dob(user_inputs.get('birthday', ''))
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "_token": self.csrf_token,
|
|
|
+ "country": str(self.free_config.get("consularid", "1")),
|
|
|
+ "visitingcountry": str(self.free_config.get("consularid", "1")),
|
|
|
+ "city": "6",
|
|
|
+ "office": "1",
|
|
|
+ "officetype": "1",
|
|
|
+ "totalPerson": "1",
|
|
|
+ "name1": user_inputs.get('first_name', '').upper(),
|
|
|
+ "surname1": user_inputs.get('last_name', '').upper(),
|
|
|
+ "nationality1": "2",
|
|
|
+ "birthday1": str(dob.day),
|
|
|
+ "birthmonth1": str(dob.month),
|
|
|
+ "birthyear1": str(dob.year),
|
|
|
+ "passport1": user_inputs.get('passport_no'),
|
|
|
+ "passportExpirationDate1": datetime.strptime(user_inputs.get('passport_expiry_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
|
|
|
+ "email1": alias_email,
|
|
|
+ "phone1": user_inputs.get('phone_no'),
|
|
|
+ "alternativephone1": "",
|
|
|
+ "mailConfirmCode": otp,
|
|
|
+ "ctval": slot_data['data_id'],
|
|
|
+ "qtallvert": slot_data['data_all'],
|
|
|
+ "oldofficetype": "1",
|
|
|
+ "oldtotalperson": "1",
|
|
|
+ "rePaymentControl": "0",
|
|
|
+ "view_set_app_country": "Schengen - Tourism/Family&Friend Visit/Transit Visa/Other Purposes",
|
|
|
+ "view_set_app_office": "Dublin",
|
|
|
+ "view_set_app_service_type": "NORMAL",
|
|
|
+ "cargoactive": "0",
|
|
|
+ "setnewcalendarstatus": "2",
|
|
|
+ "availableDaycontrol": "0",
|
|
|
+ "travelStartDate": datetime.strptime(user_inputs.get('travel_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
|
|
|
+ "personalapproveTerms": "1"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 补全空字段 (Person 2-4)
|
|
|
+ for i in range(2, 5):
|
|
|
+ payload.update({
|
|
|
+ f"name{i}": "", f"surname{i}": "", f"nationality{i}": "0", f"birthday{i}": "0", f"birthmonth{i}": "0", f"birthyear{i}": "0", f"passport{i}": "", f"passportExpirationDate{i}": "", f"email{i}": alias_email, f"phone{i}": user_inputs.get('phone_no'), f"alternativephone{i}": ""
|
|
|
+ })
|
|
|
+
|
|
|
+ headers = {'X-Requested-With': 'XMLHttpRequest'}
|
|
|
+ return self._perform_request('POST', url, data=payload, headers=headers).text
|
|
|
+
|
|
|
+ def _filter_dates(self, dates, start, end):
|
|
|
+ if not start or not end: return dates
|
|
|
+ valid = []
|
|
|
+ s = datetime.strptime(start[:10], "%Y-%m-%d")
|
|
|
+ e = datetime.strptime(end[:10], "%Y-%m-%d")
|
|
|
+ for d in dates:
|
|
|
+ c = datetime.strptime(d, "%Y-%m-%d")
|
|
|
+ if s <= c <= e: valid.append(d)
|
|
|
+ random.shuffle(valid)
|
|
|
+ return valid
|
|
|
+
|
|
|
+ def cleanup(self):
|
|
|
+ if self.page:
|
|
|
+ try: self.page.quit()
|
|
|
+ except: pass
|
|
|
+ self.page = None
|
|
|
+ if os.path.exists(self.root_workspace):
|
|
|
+ for _ in range(3):
|
|
|
+ try: time.sleep(0.2); shutil.rmtree(self.root_workspace, ignore_errors=True); break
|
|
|
+ except: time.sleep(0.5)
|
|
|
+ if self.tunnel:
|
|
|
+ try: self.tunnel.stop()
|
|
|
+ except: pass
|
|
|
+ self.tunnel = None
|
|
|
+
|
|
|
+ def __del__(self):
|
|
|
+ self.cleanup()
|