import time
import json
import random
import re
import os
import uuid
import shutil
import base64
import socket
from datetime import datetime
from typing import List, Dict, Optional, Any, Callable
from urllib.parse import urljoin, urlparse, urlencode
# DrissionPage 核心
from DrissionPage import ChromiumPage, ChromiumOptions
from vs_plg import IVSPlg
from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
from toolkit.vs_cloud_api import VSCloudApi
from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
from toolkit.proxy_tunnel import ProxyTunnel
from toolkit.ocr_engine import DddOcrEngine
class BrowserResponse:
def __init__(self, result_dict):
result_dict = result_dict or {}
self.status_code = result_dict.get('status', 0)
self.text = result_dict.get('body', '')
self.headers = result_dict.get('headers', {})
self.url = result_dict.get('url', '')
self._json = None
def json(self):
if self._json is None:
if not self.text: return {}
try: self._json = json.loads(self.text)
except: self._json = {}
return self._json
def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
dt = datetime.strptime(data_str, date_str_format)
return dt.strftime("%Y-%m-%d")
def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
if "@" not in email: raise ValueError(f"Invalid email: {email}")
local_part, _ = email.rsplit("@", 1)
return f"{local_part}@{new_domain}"
class DePlugin2(IVSPlg):
"""
Germany (Visametric) 签证预约插件 (Browser + Tunnel Mode)
"""
def __init__(self, group_id: str):
self.group_id = group_id
self.config: Optional[VSPlgConfig] = None
self.free_config: Dict[str, Any] = {}
self.logger = None
# 浏览器实例
self.page: Optional[ChromiumPage] = None
# 资源隔离
self.instance_id = uuid.uuid4().hex[:8]
self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
self.user_data_path = os.path.join(self.root_workspace, "user_data")
if not os.path.exists(self.root_workspace):
os.makedirs(self.root_workspace)
self.tunnel = None # 代理隧道
self.is_healthy = True
self.session_create_time: float = 0
# 字符识别引擎
self.ocr_engine: Optional[DddOcrEngine] = None
# 业务状态
self.base_url = "https://ie-appointment.visametric.com"
self.csrf_token = ""
self.personal_info_val = ""
self.email_val_control = ""
def get_group_id(self) -> str:
return self.group_id
def set_log(self, logger: Callable[[str], None]) -> None:
self.logger = logger
def _log(self, message):
if self.logger:
self.logger(f'[DePlugin] [{self.group_id}] {message}')
else:
print(f'[DePlugin] [{self.group_id}] {message}')
def set_config(self, config: VSPlgConfig):
self.config = config
self.free_config = config.free_config or {}
if self.free_config.get("base_url"):
self.base_url = self.free_config["base_url"].rstrip('/')
def health_check(self) -> bool:
if not self.is_healthy:
return False
if not self.page:
return False
try:
if not self.page.run_js("return 1;"):
return False
except:
return False
if self.config.session_max_life > 0:
if time.time() - self.session_create_time > self.config.session_max_life * 60:
self._log("Session expired.")
return False
return True
def create_session(self):
"""
创建会话:启动浏览器 -> 代理隧道 -> 过盾 -> 提取 Captcha -> 本地识别 -> 提交 -> 获取 Context
"""
self._log(f"Initializing Session (ID: {self.instance_id})...")
self.ocr_engine = DddOcrEngine()
co = ChromiumOptions()
# 端口分配 (Docker 适配)
def get_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0)); return s.getsockname()[1]
co.set_local_port(get_free_port())
# 路径与隔离
co.set_user_data_path(self.user_data_path)
chrome_path = os.getenv("CHROME_BIN")
if chrome_path and os.path.exists(chrome_path):
co.set_paths(browser_path=chrome_path)
# 代理隧道
if self.config.proxy and self.config.proxy.ip:
p = self.config.proxy
if p.username and p.password:
self._log(f"Starting Tunnel for {p.ip}...")
self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
local_proxy = self.tunnel.start()
self._log(f"Tunnel started at {local_proxy}")
co.set_argument(f'--proxy-server={local_proxy}')
else:
proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
co.set_argument(f'--proxy-server={proxy_str}')
else:
self._log("[WARN] No proxy configured!")
# Docker 核心参数
co.headless(False)
co.set_argument('--no-sandbox')
co.set_argument('--disable-gpu')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--window-size=1920,1080')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--ignore-certificate-errors')
try:
self.page = ChromiumPage(co)
# 1. 访问首页
url_home = f"{self.base_url}/en"
self._log(f"Navigating to {url_home}")
self.page.get(url_home)
# 2. Cloudflare 过盾
cf = CloudflareBypasser(self.page, log=self.config.debug)
if not cf.bypass(max_retry=15):
if "access denied" in self.page.title.lower():
raise BizLogicError("Cloudflare Access Denied")
raise BizLogicError("Cloudflare bypass timeout")
# 3. 提取 CSRF 和 验证码
# 等待页面加载
meta_ele = self.page.ele('xpath://meta[@name="csrf-token"]', timeout=30)
if not meta_ele:
# 截图调试,看看是不是还在 Cloudflare 或者加载失败
self.page.get_screenshot(path='csrf_not_found.jpg')
raise NotFoundError("CSRF Token meta tag not found (Page load failed?)")
self.csrf_token = meta_ele.attr('content')
# 提取验证码图片 (Visametric Base64)
html = self.page.html
match = re.search(r'"data:image/png;base64,"\s*\+\s*"(.*?)"', html)
if not match:
# 尝试直接找 img
try:
img_ele = self.page.ele('xpath://img[contains(@src, "data:image")]')
if img_ele:
b64_src = img_ele.attr('src')
captcha_b64 = b64_src.split(',')[1]
else:
raise NotFoundError("Captcha image not found")
except:
raise NotFoundError("Captcha image not found (Regex failed)")
else:
captcha_b64 = match.group(1)
image_bytes = base64.b64decode(captcha_b64)
# 4. 识别验证码 (本地 OCR 服务)
captcha_code = self.ocr_engine.inference_captcha(image_bytes)
# 5. 提交验证码 (获取 PersonalInfo)
self._submit_captcha(captcha_code)
self.session_create_time = time.time()
self._log("Session created successfully.")
except Exception as e:
self._log(f"Session Create Failed: {e}")
self.cleanup()
raise e
def _submit_captcha(self, code):
"""
提交验证码,获取 personalinfo 和 emailValControl
"""
url = f"{self.base_url}/en/appointment-form"
payload = {
'_token': self.csrf_token,
'cpJvnsControl': '',
'mailConfirmCode': code
}
# 使用 Fetch 提交 (Form-UrlEncoded)
resp = self._perform_request('POST', url, data=payload, headers={
'X-Requested-With': 'XMLHttpRequest'
})
# 解析返回的 HTML 片段
html = resp.text
# 提取 personalinfo
match_pi = re.search(r"personalinfo:\s*'([^']*)'", html)
if match_pi: self.personal_info_val = match_pi.group(1)
# 提取 emailValControl
match_ev = re.search(r"emailValControl:\s*'([^']*)'", html)
if match_ev: self.email_val_control = match_ev.group(1)
if not self.personal_info_val:
raise NotFoundError(message="Personalinfo not found in captcha response")
# 更新 CSRF (如果返回了新的)
m = re.search(r'name="csrf-token" content="([^"]+)"', html)
if m: self.csrf_token = m.group(1)
def query(self) -> VSQueryResult:
res = VSQueryResult()
res.success = False
consular_id = self.free_config.get("consularid", "1")
url = f"{self.base_url}/en/getdate"
payload = {
"consularid": consular_id,
"exitid": "1",
"servicetypeid": "1",
"calendarType": "2",
"totalperson": "1"
}
headers = {
'X-CSRF-TOKEN': self.csrf_token,
'X-Requested-With': 'XMLHttpRequest'
}
try:
resp = self._perform_request('POST', url, data=payload, headers=headers, retry_count=1)
except Exception as e:
self._log(f"Query Error: {e}")
raise e
j = resp.json()
dates = j.get("getDateEnable", [])
res.city = self.free_config.get('city', '')
res.country = self.free_config.get('country', '')
res.visa_type = self.free_config.get('visa_type', '')
res.routing_key = self.free_config.get('routing_key', '')
if dates:
res.success = True
res.availability_status = AvailabilityStatus.Available
res.earliest_date = to_yyyymmdd(dates[0], "%d-%m-%Y")
res.availability = [
DateAvailability(date=to_yyyymmdd(d, "%d-%m-%Y"), times=[])
for d in dates
]
else:
res.availability_status = AvailabilityStatus.NoneAvailable
return res
def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
res = VSBookResult()
available_dates = [da.date for da in slot_info.availability]
exp_start = user_inputs.get('expected_start_date', '')
exp_end = user_inputs.get('expected_end_date', '')
valid_dates = self._filter_dates(available_dates, exp_start, exp_end)
if not valid_dates:
raise NotFoundError("No dates match constraints")
target_date = random.choice(valid_dates)
self._log(f"Selected date: {target_date}")
# 1. 获取时间 Slot
time_slot = self._get_slot_time(target_date)
# 2. 发送邮件流程
alias_email = get_alias_email(user_inputs.get("email"), new_domain='gmail-app.com')
self._send_email_step1(alias_email)
self._send_email_step2("0")
# 3. 读取 OTP
otp_code = self._read_otp_email(alias_email)
# 4. 提交确认
book_res_html = self._confirm_appointment(target_date, time_slot, user_inputs, otp_code, alias_email)
if "complete all required fields" in book_res_html.lower():
raise BizLogicError("Incomplete fields response")
match = re.search(r'https:\/\/checkout\.stripe\.com\/c\/pay\/[^\s"]+', book_res_html)
res.success = True
res.fee_amount = 3000
res.fee_currency = 'EUR'
res.book_date = target_date
res.book_time = time_slot['time']
if match:
res.payment_link = match.group(0)
self._log(f"Payment Link: {res.payment_link}")
return res
# ---------------------------------------------------------
# 辅助方法
# ---------------------------------------------------------
def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
if not self.page:
raise BizLogicError("Browser not init")
req_url = url
if params:
sep = '&' if '?' in req_url else '?'
req_url += sep + urlencode(params)
fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" }
if json_data:
fetch_opts['body'] = json.dumps(json_data)
fetch_opts['headers']['Content-Type'] = 'application/json'
elif data:
if isinstance(data, dict):
fetch_opts['body'] = urlencode(data)
fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
else:
fetch_opts['body'] = data
js = f"""
return fetch("{req_url}", {json.dumps(fetch_opts)})
.then(async r => {{
const h = {{}}; r.headers.forEach((v, k) => h[k] = v);
return {{ status: r.status, body: await r.text(), headers: h, url: r.url }};
}}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
"""
resp = BrowserResponse(self.page.run_js(js, timeout=60))
if resp.status_code == 200:
return resp
elif resp.status_code == 403:
if "Just a moment" in resp.text and retry_count < 2:
self._log("Cloudflare 403. Refreshing...")
if self._refresh_firewall_session():
return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
elif resp.status_code == 429:
self.is_healthy = False
raise RateLimiteddError()
elif resp.status_code in [401, 419]:
self.is_healthy = False
raise SessionExpiredOrInvalidError()
else:
raise BizLogicError(f"HTTP {resp.status_code}: {resp.text[:100]}")
def _refresh_firewall_session(self):
try:
self.page.refresh()
cf = CloudflareBypasser(self.page, log=self.config.debug)
return cf.bypass(max_retry=10)
except: return False
def _get_slot_time(self, date) -> Dict:
url = f"{self.base_url}/en/senddate"
dt_m = datetime.strptime(date, "%Y-%m-%d")
converted_date = dt_m.strftime("%d-%m-%Y")
payload = {
"fulldate": converted_date,
"totalperson": "1",
"set_new_consular_id": self.free_config.get("consularid", "1"),
"set_new_exit_office_id": "1",
"calendarType": "2",
"set_new_service_type_id": "1",
"personalinfo": self.personal_info_val
}
headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
resp = self._perform_request('POST', url, data=payload, headers=headers)
# 使用 Regex 提取 Slot
times = []
# pattern: data-id="123" ... 09:00
for m in re.finditer(r'data-id="([^"]+)"[^>]*data-all="([^"]+)"[^>]*>.*?(.*?)', resp.text, re.DOTALL):
times.append({'data_id': m.group(1), 'data_all': m.group(2), 'time': m.group(3).strip()})
if not times: raise NotFoundError("No time slots")
return random.choice(times)
def _send_email_step1(self, email):
url = f"{self.base_url}/en/jky45fgd"
payload = { "emailCheck": email, "personalinfo": self.personal_info_val }
headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
self._perform_request('POST', url, data=payload, headers=headers)
def _send_email_step2(self, code_val):
url = f"{self.base_url}/en/confirmCodeSendMail"
payload = { "confirmCode": code_val, "emailValControl": self.email_val_control }
headers = {'X-CSRF-TOKEN': self.csrf_token, 'X-Requested-With': 'XMLHttpRequest'}
self._perform_request('POST', url, data=payload, headers=headers)
def _read_otp_email(self, recipient) -> str:
master_email = "visafly666@gmail.com"
sender = 'Visametric - verify at visametric.com'
now_utc = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
for i in range(12):
c = VSCloudApi.Instance().fetch_mail_content(master_email, sender, recipient, 'Verification Code', 'Verification code', now_utc, 300)
if c:
m = re.search(r'\b\d{6}\b', c)
if m: return m.group(0)
time.sleep(5)
raise NotFoundError("OTP timeout")
def _confirm_appointment(self, date, slot_data, user_inputs, otp, alias_email):
url = f"{self.base_url}/en/personal/appointment/create"
def _get_dob(d):
try: return datetime.strptime(d[:10], "%Y-%m-%d")
except: return datetime.now()
dob = _get_dob(user_inputs.get('birthday', ''))
payload = {
"_token": self.csrf_token,
"country": str(self.free_config.get("consularid", "1")),
"visitingcountry": str(self.free_config.get("consularid", "1")),
"city": "6",
"office": "1",
"officetype": "1",
"totalPerson": "1",
"name1": user_inputs.get('first_name', '').upper(),
"surname1": user_inputs.get('last_name', '').upper(),
"nationality1": "2",
"birthday1": str(dob.day),
"birthmonth1": str(dob.month),
"birthyear1": str(dob.year),
"passport1": user_inputs.get('passport_no'),
"passportExpirationDate1": datetime.strptime(user_inputs.get('passport_expiry_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
"email1": alias_email,
"phone1": user_inputs.get('phone_no'),
"alternativephone1": "",
"mailConfirmCode": otp,
"ctval": slot_data['data_id'],
"qtallvert": slot_data['data_all'],
"oldofficetype": "1",
"oldtotalperson": "1",
"rePaymentControl": "0",
"view_set_app_country": "Schengen - Tourism/Family&Friend Visit/Transit Visa/Other Purposes",
"view_set_app_office": "Dublin",
"view_set_app_service_type": "NORMAL",
"cargoactive": "0",
"setnewcalendarstatus": "2",
"availableDaycontrol": "0",
"travelStartDate": datetime.strptime(user_inputs.get('travel_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
"personalapproveTerms": "1"
}
# 补全空字段 (Person 2-4)
for i in range(2, 5):
payload.update({
f"name{i}": "", f"surname{i}": "", f"nationality{i}": "0", f"birthday{i}": "0", f"birthmonth{i}": "0", f"birthyear{i}": "0", f"passport{i}": "", f"passportExpirationDate{i}": "", f"email{i}": alias_email, f"phone{i}": user_inputs.get('phone_no'), f"alternativephone{i}": ""
})
headers = {'X-Requested-With': 'XMLHttpRequest'}
return self._perform_request('POST', url, data=payload, headers=headers).text
def _filter_dates(self, dates, start, end):
if not start or not end: return dates
valid = []
s = datetime.strptime(start[:10], "%Y-%m-%d")
e = datetime.strptime(end[:10], "%Y-%m-%d")
for d in dates:
c = datetime.strptime(d, "%Y-%m-%d")
if s <= c <= e: valid.append(d)
random.shuffle(valid)
return valid
def cleanup(self):
if self.page:
try: self.page.quit()
except: pass
self.page = None
if os.path.exists(self.root_workspace):
for _ in range(3):
try: time.sleep(0.2); shutil.rmtree(self.root_workspace, ignore_errors=True); break
except: time.sleep(0.5)
if self.tunnel:
try: self.tunnel.stop()
except: pass
self.tunnel = None
def __del__(self):
self.cleanup()