| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563 |
- import time
- import json
- import random
- import re
- import os
- import base64
- from datetime import datetime
- from typing import List, Dict, Optional, Any
- from urllib.parse import urljoin
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN
- from toolkit.vs_cloud_api import VSCloudApi
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- # 转换日期到YYYY-MM-DD 固定格式
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime("%Y-%m-%d")
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- """
- 将邮箱域名替换为指定域名(默认 gmail-app.com)
- """
- if "@" not in email:
- raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- class DePlugin(IVSPlg):
- """
- Germany (Visametric) 签证预约插件
- 适配 Visametric Ireland -> Germany 流程
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
-
- self.session: Optional[requests.Session] = None
- self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
-
- # 状态
- self.is_healthy = True
-
- # 关键上下文变量 (从页面提取)
- self.base_url = "https://ie-appointment.visametric.com"
- self.csrf_token = ""
- self.personal_info_val = ""
- self.email_val_control = ""
-
- # 默认 OCR 服务地址
- self.local_service_url = "http://127.0.0.1:8085"
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
-
- if self.free_config.get("base_url"):
- self.base_url = self.free_config["base_url"].rstrip('/')
-
- if self.free_config.get("local_service_url"):
- self.local_service_url = self.free_config["local_service_url"]
- def health_check(self) -> bool:
- return self.is_healthy
- def create_session(self):
- """
- 初始化会话:过盾 -> 获取 CSRF -> 识别图片验证码 -> 提交验证码 -> 获取上下文
- """
- # 1. 初始化 Session
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: False,
- }
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
-
- # 2. 访问首页,获取 CSRF 和 Captcha 图片
- # Visametric 首页通常有 Cloudflare
- url_home = f"{self.base_url}/en"
-
- # 尝试过盾
- self._solve_cloudflare5S_challenge()
- default_headers = self._get_headers()
- default_headers.pop("X-Requested-With")
-
- resp = self._perform_request('GET', url_home, headers=default_headers)
-
- html = resp.text
- soup = BeautifulSoup(html, 'html.parser')
- meta = soup.find('meta', {'name': 'csrf-token'})
- if not meta:
- raise NotFoundError(message='Missing csrf-token in html')
- self.csrf_token = meta.get('content', '')
- # 提取验证码图片 Base64, 正则匹配: "data:image/png;base64," + "..."
- match = re.search(r'"data:image/png;base64,"\s*\+\s*"(.*?)"', html)
- if not match:
- raise NotFoundError(message="Captcha image not found")
-
- captcha_b64 = base64.b64decode(match.group(1))
- # 3. 识别验证码
- resp = requests.post(
- f'{self.local_service_url}/predict/visametric',
- data=captcha_b64,
- headers={"Content-Type": "application/octet-stream"},
- timeout=10
- )
- if resp.status_code != 200:
- raise BizLogicError(message='Captcha ocr server failed')
- captcha_code = resp.json().get('data', '').replace('$', '')
- VSC_INFO("de_plg", "[%s] Captcha recognized: %s", self.group_id, captcha_code)
- # 4. 提交验证码 (/appointment-form)
- # 这一步是为了让服务器验证 Session,并返回包含 personalinfo 的页面
- self._submit_captcha(captcha_code)
- VSC_INFO("de_plg", "[%s] Session created successfully.", self.group_id)
- def query(self) -> VSQueryResult:
- """
- 查询可用日期 (/getdate)
- """
- res = VSQueryResult()
- # 构造 Payload (参考 get_slot_day) 这里的 ID 需要根据实际情况配置,或者使用原代码的默认值
- consular_id = self.free_config.get("consularid", "1") # 1=Ireland?
- max_retries = self.free_config.get("slot_query_max_retries", 2)
- url = f"{self.base_url}/en/getdate"
- payload = {
- "consularid": consular_id,
- "exitid": "1",
- "servicetypeid": "1",
- "calendarType": "2",
- "totalperson": "1"
- }
-
- default_headers = self._get_headers()
- default_headers['X-CSRF-TOKEN'] = self.csrf_token
-
-
- for attempt in range(1, max_retries + 1):
- try:
- resp = self._perform_request('POST', url, data=payload, headers=default_headers)
- break # ✅ 请求成功,跳出重试循环
- except PermissionDeniedError:
- VSC_WARN(
- "de_plg",
- "[Visamtric] getdate blocked (403), attempt %d/%d",
- attempt, max_retries
- )
- # 最后一次就不再绕盾了
- if attempt >= max_retries:
- raise PermissionDeniedError()
- self._solve_cloudflare5S_challenge()
- VSC_INFO("de_plg", "[Visamtric] Cloudflare bypass success, retrying...")
- continue
- # Visametric 返回 JSON: {"getDateEnable": ["15-01-2026", "16-01-2026"]}
- j = resp.json()
- dates = j.get("getDateEnable", [])
-
- res.city = self.free_config.get('city', '')
- res.country = self.free_config.get('country', '')
- res.visa_type = self.free_config.get('visa_type', '')
- res.routing_key = self.free_config.get('routing_key', '')
- if dates:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- # Visametric 返回 DD-MM-YYYY, 标准化为 YYYY-MM-DD
- res.earliest_date = to_yyyymmdd(dates[0], '%d-%m-%Y')
- for d in dates:
- da = VSQueryResult.DateAvailability()
- da.date = to_yyyymmdd(d, '%d-%m-%Y')
- da.times = []
- time_slot = VSQueryResult.DateAvailability.TimeSlot(time="00:00", label="Available")
- da.times.append(time_slot)
- res.availability.append(da)
- else:
- res.success = False # 获取成功,只是没号,所以 success 依然是 True
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
- """
- 执行预约:选择日期 -> 选择时间 -> 发邮件 -> 填表 -> 提交
- """
- res = VSBookResult()
- # 1. 筛选日期
- available_dates = [da.date for da in slot_info.availability]
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
-
- valid_dates = self._filter_dates(available_dates, exp_start, exp_end)
- if not valid_dates:
- raise NotFoundError(message="No dates match user constraints")
-
- target_date = random.choice(valid_dates)
- VSC_INFO("de_plg", "[%s] Selected date: %s", self.group_id, target_date)
-
- # 2. 获取时间 (/senddate)
- time_slot = self._get_slot_time(target_date)
- VSC_INFO("de_plg", "[%s] Selected time: %s", self.group_id, time_slot['time'])
- # 3. 触发邮件流程 (Step 1: /jky45fgd)
- alias_email = get_alias_email(user_inputs.get("email"), new_domain='gmail-app.com')
- self._send_email_step1(alias_email)
-
- # 4. 触发邮件流程 (Step 2: /confirmCodeSendMail) 这一步会发送包含验证码的邮件 根据原代码逻辑: send_email("0") 触发发送
- self._send_email_step2("0")
- # 5. 读取 OTP
- # Visametric 邮件发送者
- recipient = alias_email
- otp_code = self._read_otp_email(recipient)
-
- # 6. 提交验证码并确认 (/personal/appointment/create)
- book_res_html = self._confirm_appointment(target_date, time_slot, user_inputs, otp_code, alias_email)
-
- if "complete all required fields" in book_res_html.lower():
- raise BizLogicError(message='Comfirm appointment response <complete all required fields>')
-
- # 7. 提取结果
- match = re.search(r'https:\/\/checkout\.stripe\.com\/c\/pay\/[^\s"]+', book_res_html)
-
-
- res.success = True
- res.fee_amount = 3000
- res.fee_currency = 'EUR'
- res.book_date = target_date
- res.book_time = time_slot['time']
-
- if match:
- res.payment_link = match.group(0)
- VSC_INFO("de_plg", "[%s] Payment Link Found: %s", self.group_id, res.payment_link)
- return res
- # ---------------------------------------------------------
- # 辅助方法
- # ---------------------------------------------------------
-
- def _get_headers(self) -> Dict[str, str]:
- """基础 Header"""
- return {
- "User-Agent": self.user_agent,
- "Accept": "*/*",
- "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8",
- "Origin": self.base_url,
- "Referer": f"{self.base_url}/en/appointment-form", # 默认 Referer
- "X-Requested-With": "XMLHttpRequest"
- }
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
- def _submit_captcha(self, code):
- url = f"{self.base_url}/en/appointment-form"
- payload = {
- '_token': self.csrf_token,
- 'cpJvnsControl': '',
- 'mailConfirmCode': code
- }
- headers = self._get_headers()
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
-
- resp = self._perform_request('POST', url, data=payload, headers=headers)
-
- # 关键:提交验证码后,返回的 HTML 中包含了后续需要的加密参数
- match_pi = re.search(r"personalinfo:\s*'([^']*)'", resp.text)
- if match_pi:
- self.personal_info_val = match_pi.group(1)
-
- # emailValControl: '...'
- match_ev = re.search(r"emailValControl:\s*'([^']*)'", resp.text)
- if match_ev:
- self.email_val_control = match_ev.group(1)
-
- if not self.personal_info_val:
- raise NotFoundError(message="Personalinfo not found in captcha response")
-
- soup = BeautifulSoup(resp.text, 'html.parser')
- meta = soup.find('meta', {'name': 'csrf-token'})
- if not meta:
- raise NotFoundError(message='Missing csrf-token in html')
- self.csrf_token = meta.get('content', '')
-
- def _get_slot_time(self, date) -> Optional[Dict]:
- url = f"{self.base_url}/en/senddate"
- dt_m = datetime.strptime(date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d-%m-%Y")
- payload = {
- "fulldate": converted_date,
- "totalperson": "1",
- "set_new_consular_id": self.free_config.get("consularid", "1"),
- "set_new_exit_office_id": "1",
- "calendarType": "2",
- "set_new_service_type_id": "1",
- "personalinfo": self.personal_info_val
- }
- headers = self._get_headers()
- headers['X-CSRF-TOKEN'] = self.csrf_token # 这里需要 CSRF
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
-
- resp = self._perform_request('POST', url, data=payload, headers=headers)
- soup = BeautifulSoup(resp.text, 'html.parser')
- buttons = soup.find_all('button')
- slots = []
- for btn in buttons:
- i_tag = btn.find('i')
- if i_tag:
- time_val = i_tag.next_sibling.strip()
- slots.append({
- 'time': time_val,
- 'data_id': btn.get('data-id'),
- 'data_all': btn.get('data-all')
- })
- if slots:
- return random.choice(slots)
- else:
- raise NotFoundError(message='Not slot time available')
-
- def _send_email_step1(self, email):
- url = f"{self.base_url}/en/jky45fgd"
- payload = {
- "emailCheck": email,
- "personalinfo": self.personal_info_val
- }
- headers = self._get_headers()
- headers['X-CSRF-TOKEN'] = self.csrf_token
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
- self._perform_request('POST', url, data=payload, headers=headers)
-
- def _send_email_step2(self, code_val):
- url = f"{self.base_url}/en/confirmCodeSendMail"
- payload = {
- "confirmCode": code_val,
- "emailValControl": self.email_val_control
- }
- headers = self._get_headers()
- headers['X-CSRF-TOKEN'] = self.csrf_token
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
- self._perform_request('POST', url, data=payload, headers=headers)
-
- def _read_otp_email(self, recipient) -> str:
- """
- 读取 OTP 邮件
- """
- master_email = "visafly666@gmail.com"
- sender = 'Visametric - verify at visametric.com'
- subject_keywords = 'Verification Code'
- body_keywords = 'Verification code'
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- VSC_INFO("de_plg", "[%s] Waiting for OTP email sent after %s...", self.group_id, formatted_utc_time)
- # 3. 轮询查收
- for i in range(12):
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email,
- sender,
- recipient,
- subject_keywords,
- body_keywords,
- formatted_utc_time,
- 300
- )
- if content_out:
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- otp = match.group(0)
- VSC_INFO("de_plg", "[%s] OTP code found: %s", self.group_id, otp)
- return otp
-
- time.sleep(5)
- raise NotFoundError(message="OTP email not found (timeout)")
- def _confirm_appointment(self, date, slot_data, user_inputs, otp, alias_email):
- url = f"{self.base_url}/en/personal/appointment/create"
-
- # 处理日期格式 YYYY-MM-DD
- def _get_dob(d_str):
- try: return datetime.strptime(d_str[:10], "%Y-%m-%d")
- except: return datetime.now()
-
- dob = _get_dob(user_inputs.get('birthday', ''))
-
- payload = {
- "_token": self.csrf_token,
- "country": str(self.free_config.get("consularid", "1")),
- "visitingcountry": str(self.free_config.get("consularid", "1")),
- "city": "6", # Dublin? 需配置
- "office": "1",
- "officetype": "1",
- "totalPerson": "1",
-
- "name1": user_inputs.get('first_name', '').upper(),
- "surname1": user_inputs.get('last_name', '').upper(),
- "nationality1": "2", # 假设值
-
- "birthday1": str(dob.day),
- "birthmonth1": str(dob.month),
- "birthyear1": str(dob.year),
-
- "passport1": user_inputs.get('passport_no'),
- # 原代码 passport_expried 是 DD-MM-YYYY
- "passportExpirationDate1": datetime.strptime(user_inputs.get('passport_expiry_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
- "email1": alias_email,
- "phone1": user_inputs.get('phone_no'),
- "alternativephone1": "",
-
- # 其他 person 留空
- "name2": "", "surname2": "", "nationality2": "0", "birthday2": "0", "birthmonth2": "0", "birthyear2": "0", "passport2": "", "passportExpirationDate2": "", "email2": alias_email, "phone2": user_inputs.get('phone_no'), "alternativephone2": "",
- "name3": "", "surname3": "", "nationality3": "0", "birthday3": "0", "birthmonth3": "0", "birthyear3": "0", "passport3": "", "passportExpirationDate3": "", "email3": alias_email, "phone3": user_inputs.get('phone_no'), "alternativephone3": "",
- "name4": "", "surname4": "", "nationality4": "0", "birthday4": "0", "birthmonth4": "0", "birthyear4": "0", "passport4": "", "passportExpirationDate4": "", "email4": alias_email, "phone4": user_inputs.get('phone_no'), "alternativephone4": "",
- "mailConfirmCode": otp,
- "ctval": slot_data['data_id'],
- "qtallvert": slot_data['data_all'],
-
- "oldofficetype": "1",
- "oldtotalperson": "1",
- "rePaymentControl": "0",
-
- # 关键:View Set
- "view_set_app_country": "Schengen - Tourism/Family&Friend Visit/Transit Visa/Other Purposes", # 需配置
- "view_set_app_office": "Dublin",
- "view_set_app_service_type": "NORMAL",
-
- "cargoactive": "0",
- "setnewcalendarstatus": "2",
- "availableDaycontrol": "0",
-
- "travelStartDate": datetime.strptime(user_inputs.get('travel_date', '')[:10], "%Y-%m-%d").strftime("%d-%m-%Y"),
- "personalapproveTerms": "1"
- }
-
- headers = self._get_headers()
- headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
-
- resp = self._perform_request('POST', url, data=payload, headers=headers)
- return resp.text
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- """
- 根据用户的期望范围筛选可用日期
-
- :param dates: API 返回的可用日期列表 (YYYY-MM-DD)
- :param start_str: 用户期望开始日期 (YYYY-MM-DD)
- :param end_str: 用户期望结束日期 (YYYY-MM-DD)
- :return: 符合要求的日期列表
- """
- # 如果没有设置范围,则不过滤,返回所有日期
- if not start_str or not end_str:
- return dates
-
- valid_dates = []
- # 截取前10位以防带有时分秒
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- # 比较范围 (闭区间)
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- print(f'[perform request] {method} {url} {data} {json_data} {params}')
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- VSC_INFO('tls_plg', resp.text)
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _solve_cloudflare5S_challenge(self):
- """
- 解决 Cloudflare 5s 盾
- """
- VSC_INFO("de_plg", f"[{self.group_id}] Solving Cloudflare 5s...")
- website_url = f'{self.base_url}/en'
-
- # 1. 格式化代理字符串, 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例)
- p = self.config.proxy
- if p.username:
- proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}"
- else:
- proxy_str = f"{p.ip}:{p.port}"
- # 2. 提交任务
- task = VSCloudApi.Instance().submit_anticloudflare_task(proxy_str, website_url)
- # 3. 等待结果
- task_id = str(task['id'])
- result = VSCloudApi.Instance().get_anticloudflare_result(task_id)
- parsed = json.loads(result.get('result', '{}'))
- cookies_list = parsed.get('cookies', [])
- for cookie in cookies_list:
- if cookie['name'] in ['__cf_bm', 'cf_clearance']:
- self.session.cookies.set(
- cookie['name'],
- cookie['value'],
- domain=cookie['domain'],
- path='/'
- )
- ua = parsed.get('userAgent')
- if ua:
- self.user_agent = ua
- self.session.headers['User-Agent'] = ua
- VSC_INFO("de_plg", "[%s] Cloudflare 5s challenge solved.", self.group_id)
|