| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import shutil
- import base64
- import socket
- from datetime import datetime
- from typing import List, Dict, Optional, Any, Callable
- from urllib.parse import urljoin, urlparse, urlencode
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
- from toolkit.proxy_tunnel import ProxyTunnel
- from toolkit.ocr_engine import DddOcrEngine
- class BrowserResponse:
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text: return {}
- try: self._json = json.loads(self.text)
- except: self._json = {}
- return self._json
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime("%Y-%m-%d")
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- if "@" not in email: raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- class DePlugin2(IVSPlg):
- """
- Germany (Visametric) 签证预约插件 (Browser + Tunnel Mode)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- # 浏览器实例
- self.page: Optional[ChromiumPage] = None
-
- # 资源隔离
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- self.tunnel = None # 代理隧道
- self.is_healthy = True
- self.session_create_time: float = 0
-
- # 字符识别引擎
- self.ocr_engine: Optional[DddOcrEngine] = None
-
- # 业务状态
- self.base_url = "https://ie-appointment.visametric.com"
- self.csrf_token = ""
- self.personal_info_val = ""
- self.email_val_control = ""
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[DePlugin] [{self.group_id}] {message}')
- else:
- print(f'[DePlugin] [{self.group_id}] {message}')
-
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- if self.free_config.get("base_url"):
- self.base_url = self.free_config["base_url"].rstrip('/')
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if not self.page:
- return False
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
- if self.config.session_max_life > 0:
- if time.time() - self.session_create_time > self.config.session_max_life * 60:
- self._log("Session expired.")
- return False
- return True
- def create_session(self):
- """
- 创建会话:启动浏览器 -> 代理隧道 -> 过盾 -> 提取 Captcha -> 本地识别 -> 提交 -> 获取 Context
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- self.ocr_engine = DddOcrEngine()
- co = ChromiumOptions()
- # 端口分配 (Docker 适配)
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0)); return s.getsockname()[1]
- co.set_local_port(get_free_port())
-
- # 路径与隔离
- co.set_user_data_path(self.user_data_path)
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- # 代理隧道
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
- if p.username and p.password:
- self._log(f"Starting Tunnel for {p.ip}...")
- self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
- local_proxy = self.tunnel.start()
- self._log(f"Tunnel started at {local_proxy}")
- co.set_argument(f'--proxy-server={local_proxy}')
- else:
- proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
- # Docker 核心参数
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- co.set_argument('--ignore-certificate-errors')
- try:
- self.page = ChromiumPage(co)
-
- # 1. 访问首页
- url_home = f"{self.base_url}/en"
- self._log(f"Navigating to {url_home}")
- self.page.get(url_home)
-
- # 2. Cloudflare 过盾
- cf = CloudflareBypasser(self.page, log=self.config.debug)
- if not cf.bypass(max_retry=15):
- if "access denied" in self.page.title.lower():
- raise BizLogicError("Cloudflare Access Denied")
- raise BizLogicError("Cloudflare bypass timeout")
- # 3. 提取 CSRF 和 验证码
- # 等待页面加载
- meta_ele = self.page.ele('xpath://meta[@name="csrf-token"]', timeout=30)
-
- if not meta_ele:
- # 截图调试,看看是不是还在 Cloudflare 或者加载失败
- self.page.get_screenshot(path='csrf_not_found.jpg')
- raise NotFoundError("CSRF Token meta tag not found (Page load failed?)")
-
- self.csrf_token = meta_ele.attr('content')
-
- # 提取验证码图片 (Visametric Base64)
- html = self.page.html
- match = re.search(r'"data:image/png;base64,"\s*\+\s*"(.*?)"', html)
- if not match:
- # 尝试直接找 img
- try:
- img_ele = self.page.ele('xpath://img[contains(@src, "data:image")]')
- if img_ele:
- b64_src = img_ele.attr('src')
- captcha_b64 = b64_src.split(',')[1]
- else:
- raise NotFoundError("Captcha image not found")
- except:
- raise NotFoundError("Captcha image not found (Regex failed)")
- else:
- captcha_b64 = match.group(1)
- image_bytes = base64.b64decode(captcha_b64)
- # 4. 识别验证码 (本地 OCR 服务)
- captcha_code = self.ocr_engine.inference_captcha(image_bytes)
- # 5. 提交验证码 (获取 PersonalInfo)
- self._submit_captcha(captcha_code)
-
- self.session_create_time = time.time()
- self._log("Session created successfully.")
- except Exception as e:
- self._log(f"Session Create Failed: {e}")
- self.cleanup()
- raise e
- def _submit_captcha(self, code):
- """
- 提交验证码,获取 personalinfo 和 emailValControl
- """
- url = f"{self.base_url}/en/appointment-form"
- payload = {
- '_token': self.csrf_token,
- 'cpJvnsControl': '',
- 'mailConfirmCode': code
- }
-
- # 使用 Fetch 提交 (Form-UrlEncoded)
- resp = self._perform_request('POST', url, data=payload, headers={
- 'X-Requested-With': 'XMLHttpRequest'
- })
-
- # 解析返回的 HTML 片段
- html = resp.text
-
- # 提取 personalinfo
- match_pi = re.search(r"personalinfo:\s*'([^']*)'", html)
- if match_pi: self.personal_info_val = match_pi.group(1)
-
- # 提取 emailValControl
- match_ev = re.search(r"emailValControl:\s*'([^']*)'", html)
- if match_ev: self.email_val_control = match_ev.group(1)
-
- if not self.personal_info_val:
- raise NotFoundError(message="Personalinfo not found in captcha response")
-
- # 更新 CSRF (如果返回了新的)
- m = re.search(r'name="csrf-token" content="([^"]+)"', html)
- if m: self.csrf_token = m.group(1)
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
-
- consular_id = self.free_config.get("consularid", "1")
- url = f"{self.base_url}/en/getdate"
- payload = {
- "consularid": consular_id,
- "exitid": "1",
- "servicetypeid": "1",
- "calendarType": "2",
- "totalperson": "1"
- }
-
- headers = {
- 'X-CSRF-TOKEN': self.csrf_token,
- 'X-Requested-With': 'XMLHttpRequest'
- }
-
- try:
- resp = self._perform_request('POST', url, data=payload, headers=headers, retry_count=1)
- except Exception as e:
- self._log(f"Query Error: {e}")
- raise e
- j = resp.json()
- dates = j.get("getDateEnable", [])
-
- if dates:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = to_yyyymmdd(dates[0], "%d-%m-%Y")
- res.availability = [
- DateAvailability(date=to_yyyymmdd(d, "%d-%m-%Y"), times=[])
- for d in dates
- ]
- else:
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
- res = VSBookResult()
- available_dates = [da.date for da in slot_info.availability]
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
-
- valid_dates = self._filter_dates(available_dates, exp_start, exp_end)
- if not valid_dates:
- raise NotFoundError("No dates match constraints")
-
- target_date = random.choice(valid_dates)
- self._log(f"Selected date: {target_date}")
-
- # 1. 获取时间 Slot
- time_slot = self._get_slot_time(target_date)
-
- # 2. 发送邮件流程
- alias_email = get_alias_email(user_inputs.get("email"), new_domain='gmail-app.com')
- self._send_email_step1(alias_email)
- self._send_email_step2("0")
-
- # 3. 读取 OTP
- otp_code = self._read_otp_email(alias_email)
-
- # 4. 提交确认
- book_res_html = self._confirm_appointment(target_date, time_slot, user_inputs, otp_code, alias_email)
-
- if "complete all required fields" in book_res_html.lower():
- raise BizLogicError("Incomplete fields response")
-
- match = re.search(r'https:\/\/checkout\.stripe\.com\/c\/pay\/[^\s"]+', book_res_html)
-
- res.success = True
- res.fee_amount = 3000
- res.fee_currency = 'EUR'
- res.book_date = target_date
- res.book_time = time_slot['time']
-
- if match:
- res.payment_link = match.group(0)
- self._log(f"Payment Link: {res.payment_link}")
-
- return res
- # ---------------------------------------------------------
- # 辅助方法
- # ---------------------------------------------------------
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- if not self.page:
- raise BizLogicError("Browser not init")
-
- req_url = url
- if params:
- sep = '&' if '?' in req_url else '?'
- req_url += sep + urlencode(params)
-
- fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" }
-
- if json_data:
- fetch_opts['body'] = json.dumps(json_data)
- fetch_opts['headers']['Content-Type'] = 'application/json'
- elif data:
- if isinstance(data, dict):
- fetch_opts['body'] = urlencode(data)
- fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
- else:
- fetch_opts['body'] = data
- js = f"""
- return fetch("{req_url}", {json.dumps(fetch_opts)})
- .then(async r => {{
- const h = {{}}; r.headers.forEach((v, k) => h[k] = v);
- return {{ status: r.status, body: await r.text(), headers: h, url: r.url }};
- }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
-
- resp = BrowserResponse(self.page.run_js(js, timeout=60))
-
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 403:
- if "Just a moment" in resp.text and retry_count < 2:
- self._log("Cloudflare 403. Refreshing...")
- if self._refresh_firewall_session():
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
- raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- elif resp.status_code in [401, 419]:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- else:
- raise BizLogicError(f"HTTP {resp.status_code}: {resp.text[:100]}")
- def _refresh_firewall_session(self):
- try:
- self.page.refresh()
- cf = CloudflareBypasser(self.page, log=self.config.debug)
- return cf.bypass(max_retry=10)
- except: return False
- def _get_slot_time(self, date) -> Dict:
- url = f"{self.base_url}/en/senddate"
- dt_m = datetime.strptime(date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d-%m-%Y")
- payload = {
- "fulldate": converted_date,
- "totalperson": "1",
- "set_new_consular_id": self.free_config.get("consularid", "1"),
- "set_new_exit_office_id": "1",
- "calendarType": "2",
- "set_new_service_type_id": "1",
- "personalinfo": self.personal_info_val
- }
- headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
- resp = self._perform_request('POST', url, data=payload, headers=headers)
-
- # 使用 Regex 提取 Slot
- times = []
- # pattern: data-id="123" ... <i>09:00</i>
- for m in re.finditer(r'data-id="([^"]+)"[^>]*data-all="([^"]+)"[^>]*>.*?<i>(.*?)</i>', resp.text, re.DOTALL):
- times.append({'data_id': m.group(1), 'data_all': m.group(2), 'time': m.group(3).strip()})
-
- if not times: raise NotFoundError("No time slots")
- return random.choice(times)
- def _send_email_step1(self, email):
- url = f"{self.base_url}/en/jky45fgd"
- payload = { "emailCheck": email, "personalinfo": self.personal_info_val }
- headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
- self._perform_request('POST', url, data=payload, headers=headers)
- def _send_email_step2(self, code_val):
- url = f"{self.base_url}/en/confirmCodeSendMail"
- payload = { "confirmCode": code_val, "emailValControl": self.email_val_control }
- headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
- self._perform_request('POST', url, data=payload, headers=headers)
- def _read_otp_email(self, recipient) -> str:
- master_email = "visafly666@gmail.com"
- sender = 'Visametric - verify at visametric.com'
- now_utc = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
- for i in range(12):
- c = VSCloudApi.Instance().fetch_mail_content(master_email, sender, recipient, 'Verification Code', 'Verification code', now_utc, 300)
- if c:
- m = re.search(r'\b\d{6}\b', c)
- if m: return m.group(0)
- time.sleep(5)
- raise NotFoundError("OTP timeout")
- def _confirm_appointment(self, date, slot_data, user_inputs, otp, alias_email):
- url = f"{self.base_url}/en/personal/appointment/create"
- def _get_dob(d):
- try: return datetime.strptime(d[:10], "%Y-%m-%d")
- except: return datetime.now()
- dob = _get_dob(user_inputs.get('birthday', ''))
-
- payload = {
- "_token": self.csrf_token,
- "country": str(self.free_config.get("consularid", "1")),
- "visitingcountry": str(self.free_config.get("consularid", "1")),
- "city": "6",
- "office": "1",
- "officetype": "1",
- "totalPerson": "1",
- "name1": user_inputs.get('first_name', '').upper(),
- "surname1": user_inputs.get('last_name', '').upper(),
- "nationality1": "2",
- "birthday1": str(dob.day),
- "birthmonth1": str(dob.month),
- "birthyear1": str(dob.year),
- "passport1": user_inputs.get('passport_no'),
- "passportExpirationDate1": datetime.strptime(user_inputs.get('passport_expiry_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
- "email1": alias_email,
- "phone1": user_inputs.get('phone_no'),
- "alternativephone1": "",
- "mailConfirmCode": otp,
- "ctval": slot_data['data_id'],
- "qtallvert": slot_data['data_all'],
- "oldofficetype": "1",
- "oldtotalperson": "1",
- "rePaymentControl": "0",
- "view_set_app_country": "Schengen - Tourism/Family&Friend Visit/Transit Visa/Other Purposes",
- "view_set_app_office": "Dublin",
- "view_set_app_service_type": "NORMAL",
- "cargoactive": "0",
- "setnewcalendarstatus": "2",
- "availableDaycontrol": "0",
- "travelStartDate": datetime.strptime(user_inputs.get('travel_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
- "personalapproveTerms": "1"
- }
-
- # 补全空字段 (Person 2-4)
- for i in range(2, 5):
- payload.update({
- f"name{i}": "", f"surname{i}": "", f"nationality{i}": "0", f"birthday{i}": "0", f"birthmonth{i}": "0", f"birthyear{i}": "0", f"passport{i}": "", f"passportExpirationDate{i}": "", f"email{i}": alias_email, f"phone{i}": user_inputs.get('phone_no'), f"alternativephone{i}": ""
- })
- headers = {'X-Requested-With': 'XMLHttpRequest'}
- return self._perform_request('POST', url, data=payload, headers=headers).text
- def _filter_dates(self, dates, start, end):
- if not start or not end: return dates
- valid = []
- s = datetime.strptime(start[:10], "%Y-%m-%d")
- e = datetime.strptime(end[:10], "%Y-%m-%d")
- for d in dates:
- c = datetime.strptime(d, "%Y-%m-%d")
- if s <= c <= e: valid.append(d)
- random.shuffle(valid)
- return valid
- def cleanup(self):
- if self.page:
- try: self.page.quit()
- except: pass
- self.page = None
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try: time.sleep(0.2); shutil.rmtree(self.root_workspace, ignore_errors=True); break
- except: time.sleep(0.5)
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- self.cleanup()
|