| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910 |
- import re
- import os
- import uuid
- import base64
- import time
- import json
- import random
- import string
- from datetime import datetime, timedelta
- from pathlib import Path
- from urllib.parse import urlparse, parse_qs, urlencode
- from typing import Dict, List, Optional, Any, Callable
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- # 框架依赖
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- class BlsPlugin(IVSPlg):
- """
- BLS 签证预约插件 (精简版)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- self.session: Optional[requests.Session] = None
-
- # 运行时状态
- self.book_params: Dict = {}
- self.is_healthy = True
-
- # OCR 服务地址默认值
- self.local_service_url = ""
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- # 从配置中读取 OCR 服务地址,如果没有则使用默认
- if self.free_config.get("local_service_url"):
- self.local_service_url = self.free_config["local_service_url"]
- def health_check(self) -> bool:
- return self.is_healthy
- def create_session(self):
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options={
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.VERBOSE: self.config.debug
- }
- )
- domain = self.free_config.get("domain")
- if not domain:
- raise NotFoundError(message="Required field [domain] in free config")
- # 1.1 获取登录页 & 解析参数
- login_url = f"https://{domain}/Global/account/login"
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
-
- resp = self._perform_request('GET', login_url, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Login_Page")
- soup = BeautifulSoup(resp.text, 'html.parser')
- form_data = self._extract_hidden_fields(soup)
-
- real_user = None
- real_pass = None
-
- # 解析动态 ID (UserId1, Password1 等)
- for inp in soup.find_all('input'):
- name = inp.get('name', '')
- if inp.has_attr('required'):
- if 'UserId' in name:
- real_user = name
- elif 'Password' in name:
- real_pass = name
-
- # 解析 data 参数 (用于验证码)
- data_val = self._extract_js_var(resp.text, "iframeOpenUrl", r"data=([^']+)")
-
- # 1.2 处理验证码
- captcha_token = self._solve_bls_captcha(data_val)
-
- # 1.3 提交登录
- submit_url = f"https://{domain}/Global/account/loginsubmit"
- payload = form_data
- payload["X-Requested-With"] = "XMLHttpRequest"
- payload["CaptchaData"] = captcha_token
- # 填入账号密码
- payload[real_user] = self.config.account.username
- payload[real_pass] = self.config.account.password
- login_resp = self._perform_request('POST', submit_url, data=payload, headers=headers)
- if login_resp.json()['success']:
- return
- raise BizLogicError(message='Login failed')
- # =========================================================================
- # 2. 查询流程 (Query)
- # =========================================================================
- def query(self) -> VSQueryResult:
- res = VSQueryResult()
- domain = self.free_config.get("domain")
- # 2.1 签证类型验证
- url_vtv = f"https://{domain}/Global/bls/visatypeverification"
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
- resp = self._perform_request('GET', url_vtv, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Visatypeverification_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- form_vtv = self._extract_hidden_fields(BeautifulSoup(resp.text, 'html.parser'))
- captcha_token = self._solve_bls_captcha()
-
- form_vtv['CaptchaData'] = captcha_token
- form_vtv["X-Requested-With"] = "XMLHttpRequest"
-
- vtv_resp = self._perform_request('POST', f"https://{domain}/Global/bls/VisaTypeVerification", data=form_vtv, headers=headers)
- if not vtv_resp.json()['success']:
- raise BizLogicError(message='Submit VisaTypeVerification Failed')
-
- # 2.2 签证类型选择
- return_url = vtv_resp.json()['returnUrl'] # 包含 data=xxx
- data_val = re.search(r"data=([^&]+)", return_url).group(1)
-
- url_vt = f"https://{domain}/Global/bls/visatype?data={data_val}"
-
- vt_resp = self._perform_request('GET', url_vt, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Visatype_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- # 这里需要极其复杂的 JS 变量提取 (JS Arrays -> Match Name -> Get ID)
- vt_payload = self._construct_visatype_payload(vt_resp.text, BeautifulSoup(vt_resp.text, 'html.parser'))
-
- res.city = self.free_config.get('city', '')
- res.country = self.free_config.get('country', '')
- res.visa_type = self.free_config.get('visa_type', '')
- res.routing_key = self.free_config.get('routing_key', '')
- vt_res = self._perform_request('POST', f"https://{domain}/Global/bls/VisaType", data=vt_payload, headers=headers)
- if not vt_res.json()['success']:
- if not vt_res.json()['available']:
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- # 2.3 获取预约参数
- final_url = vt_res.json()['returnUrl']
- q_params = parse_qs(urlparse(final_url).query)
- self.book_params = {k: v[0] for k, v in q_params.items()}
-
- # 2.4 查询日历
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
-
- resp_ma = self._perform_request('GET', url_ma, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_ManageAppointment_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- avail_str = self._extract_js_var(resp_ma.text, "var availDates", r"var availDates =(.*?);")
- if avail_str:
- avail_json = json.loads(avail_str)
- # 提取日期
- dates = [x['DateText'] for x in avail_json['ad'] if x['SingleSlotAvailable']]
- if dates:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = dates[0]
- for d in dates:
- da = VSQueryResult.DateAvailability()
- da.date = d
- da.times = []
- time_slot = VSQueryResult.DateAvailability.TimeSlot(time="00:00", label="Available")
- da.times.append(time_slot)
- res.availability.append(da)
- else:
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
-
- raise BizLogicError(message='Query page not found required field [var availDates]')
- def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
- res = VSBookResult()
- domain = self.free_config.get("domain")
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
- # 3.1 获取 Manage Page (为了 Token 和 JS 变量)
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
-
- resp_ma = self._perform_request('GET', url_ma, headers=headers)
- ma_soup = BeautifulSoup(resp_ma.text, 'html.parser')
- ma_form = self._extract_hidden_fields(ma_soup)
- req_token = ma_form.get('__RequestVerificationToken')
-
- # 3.2 上传照片
- if 'passport_image_url' not in user_inputs:
- raise NotFoundError()
-
- photo_bytes = requests.get(user_inputs['passport_image_url']).content
- boundary = "----WebKitFormBoundary" + "".join(random.choices(string.ascii_letters + string.digits, k=16))
- upload_headers = {
- "content-type": f"multipart/form-data; boundary={boundary}",
- "requestverificationtoken": req_token,
- "x-requested-with": "XMLHttpRequest",
- }
- body = (f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"photo.jpg\"\r\n"
- f"Content-Type: image/jpeg\r\n\r\n").encode("utf-8") + photo_bytes + f"\r\n--{boundary}--\r\n".encode("utf-8")
-
- up_resp = self.session.post(f"https://{domain}/Global/query/UploadProfileImage", headers=upload_headers, data=body)
- if up_resp.status_code !=200:
- raise BizLogicError(message='Upload Passport Image failed')
-
- ma_form['ApplicantPhotoId'] = up_resp.json()['fileId']
- # 3.3 邮箱 OTP 流程
- data_val = self._extract_js_var(resp_ma.text, "win.iframeOpenUrl", r"data=([^&]+)")
-
- # 发送 OTP
- headers["X-Requested-With"] = "XMLHttpRequest"
- self._perform_request('GET', f"https://{domain}/Global/blsappointment/SendAppointmentVerificationCode?code={data_val}", headers=headers)
-
- # 读取 OTP (Wait 30s max)
- otp_code = self._read_otp_email(wait_sec=30)
-
- # 验证 OTP
- verify_payload = {
- "Code": otp_code,
- "Value": ma_form.get('EmailCode'),
- "Id": ma_form.get('Id')
- }
-
- headers['requestverificationtoken'] = req_token
- v_resp = self._perform_request('POST', f"https://{domain}/Global/blsappointment/VerifyEmail", data=verify_payload, headers=headers)
- headers.pop('requestverificationtoken')
- if not v_resp.json().get('success'):
- raise BizLogicError(message='Email verification failed')
-
- ma_form['EmailVerified'] = 'True'
- ma_form['EmailVerificationCode'] = otp_code
- # 3.4 锁定时间 (简单随机)
- target_date = slot_info.earliest_date
- # Query Slots in Day
- slot_url = f"https://{domain}/Global/blsappointment/GetAvailableSlotsByDate"
- # 构造复杂的 query params... 省略部分非关键参数
- slot_params = {
- "appointmentDate": target_date,
- "locationId": ma_form.get("LocationId"),
- "categoryId": ma_form.get("AppointmentCategoryId"),
- "visaType": ma_form.get("VisaType"),
- "visaSubType": ma_form.get("VisaSubTypeId"),
- "applicantCount": 1,
- "dataSource": ma_form.get("DataSource"),
- "missionId": ma_form.get("MissionId")
- }
-
- headers['requestverificationtoken'] = req_token
- slots_resp = self._perform_request('POST', slot_url, params=slot_params, headers=headers)
- headers.pop('requestverificationtoken')
- slots_data = sorted(slots_resp.json(), key=lambda x: -x["Count"]) # 选剩余最多的
- if not slots_data or slots_data[0]['Count'] <= 0:
- self._log('Available slot times not found')
- res.success = False
- return res
-
- target_time = slots_data[0]['Name']
- ma_form['ServerAppointmentDate'] = target_date
- ma_form['AppointmentDetailsList'] = '[]'
- # 这里的 key 是动态的 ID,需重新解析 ID
- date_id = re.search(r'AppointmentDate(\d+)', str(ma_soup)).group(1)
- slot_id = re.search(r'AppointmentSlot(\d+)', str(ma_soup)).group(1)
- ma_form[f'AppointmentDate{date_id}'] = target_date
- ma_form[f'AppointmentSlot{slot_id}'] = target_time
- # 3.5 再次验证码 & 提交 ManageAppointment
- captcha_token = self._solve_bls_captcha(data_val)
- ma_form['CaptchaData'] = captcha_token
-
- final_ma_resp = self._perform_request('POST', f"https://{domain}/Global/BLSAppointment/ManageAppointment", data=ma_form, headers=headers)
-
- appt_model_id = final_ma_resp.json().get('model', {}).get('Id')
- if not appt_model_id:
- raise NotFoundError(message='Appointment model id not found')
- # 3.6 填写申请表 (VisaAppointmentForm)
- # 获取页面 -> 解析 JS 变量 -> 映射 UserInfo -> 提交
- # 这里逻辑较深,核心是映射。简化为提交一个空的 applicants JSON,实际需完整映射。
- # 假设 _fill_applicant_form 做了这些工作
- self._submit_final_form(appt_model_id, user_inputs, self.book_params, req_token)
- # 成功,返回 Liveness 链接
- Liveness_page = f"https://{domain}/Global/BlsAppointment/livenessView?id={appt_model_id}"
- session_data = self._save_http_session(Liveness_page)
- res.success = True
- res.account = self.config.account.username
- res.session_id = session_data['session_id']
- res.book_date = target_date
- res.book_time = target_time
- self._log(f"Book Success. Liveness URL: {res.payment_link}")
- return res
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[BlsPlugin] [{self.group_id}] {message}')
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
-
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- save_dir = "debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{timestamp}.html"
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
- self._log(f"HTML saved to: {filename}")
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- if self.config.debug:
- self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}')
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _solve_bls_captcha(self, data='') -> Optional[str]:
- """
- 验证码处理:获取图片 -> 调用远程 OCR 服务 -> 提交验证
- """
- domain = self.free_config.get("domain")
- url = f"https://{domain}/Global/NewCaptcha/GenerateCaptcha"
- if data: url = f"https://{domain}/Global/CaptchaPublic/GenerateCaptcha?data={data}"
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
- resp = self._perform_request("GET", url, headers=headers)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Captcha_Page")
- self._check_resp_is_session_expired_or_invalid('Please select all boxes with number', resp)
-
- soup = BeautifulSoup(resp.text, 'html.parser')
- resp = requests.post(
- f'{self.local_service_url}/browser/visable_captchas',
- data=resp.text,
- headers={"Content-Type": "text/plain"},
- timeout=10
- )
- result = resp.json()
- if result.get('status') != 'success':
- raise BizLogicError(message='Broswer task failed')
-
- numbers = result['data']['number']
- image_ids = result['data']['image_ids']
- selected_ids = []
- for sid in image_ids:
- div = soup.find("div", id=sid)
- img = div.find("img")
- src = img.get("src")
- base64_data = src.split("base64,", 1)[1]
- img_bytes = base64.b64decode(base64_data)
- ocr_resp = requests.post(
- f'{self.local_service_url}/predict/bls?model=pytorch',
- data=img_bytes,
- headers={"Content-Type": "application/octet-stream"},
- timeout=5
- )
- if ocr_resp.status_code == 200:
- res_json = ocr_resp.json()
- ocr_res = res_json.get('data', '').replace('$', '')[:3]
- self._log(f'ocr captcha id={sid} result={ocr_res}, target={numbers}')
- if ocr_res == numbers:
- selected_ids.append(sid)
- else:
- raise BizLogicError(message='Captcha server response error')
- if not selected_ids:
- raise BizLogicError(message='Captcha selected ids is empty')
-
- # 3. 提交选中结果
- self._log(f'select_ids={selected_ids}')
- form = self._extract_hidden_fields(soup)
- form['SelectedImages'] = ",".join(selected_ids)
- submit_url = f"https://{domain}/Global/{'CaptchaPublic' if data else 'NewCaptcha'}/SubmitCaptcha"
- headers["X-Requested-With"] = "XMLHttpRequest"
- resp = self._perform_request('POST', submit_url, headers=headers, data=form)
- j = resp.json()
- if j.get('success'):
- if data:
- return resp.json()['captcha']
- else:
- return resp.json()['cd']
- else:
- # 存盘所有错误验证码后续进行数据分析
- self._log('Captcha Selection Invalid, Saving important data to data/bls_captcha')
- for img in soup.select("img.captcha-img"):
- src = img.get("src", "")
- if not src.startswith("data:image"):
- continue
- b64 = src.split("base64,", 1)[1]
- with open(f'data/bls_captcha/{uuid.uuid4().hex}.jpg', "wb") as fp:
- fp.write(base64.b64decode(b64))
- raise BizLogicError(message="Sovle captcha failed")
- def _extract_hidden_fields(self, soup) -> Dict:
- params = {}
- form = soup.find("form")
- if form:
- for inp in form.find_all("input"):
- name = inp.get("name")
- if name: params[name] = inp.get("value", "")
- else:
- self._log('Form element not found')
- return params
- def _extract_js_var(self, html, context, pattern):
- # 简单正则提取
- if context in html:
- match = re.search(pattern, html)
- if match: return match.group(1)
- return ""
- def _construct_visatype_payload(self, html: str, soup: BeautifulSoup) -> Optional[Dict]:
- """
- 构造 VisaType 提交参数 (对应原代码 parse_visatype_form)
- """
- # 基础表单参数 (__RequestVerificationToken 等)
- params = self._extract_hidden_fields(soup)
-
- # 提取页面中的 JS 数据变量
- def get_js_data(var_name):
- try:
- # 匹配 var name = [...]; 结构
- pattern = f"var {var_name}\\s*=\\s*(.*?);"
- match = re.search(pattern, html, re.DOTALL)
- if match:
- return json.loads(match.group(1))
- except Exception as e:
- self._log(f"Failed to parse JS var {var_name}: {e}")
- return []
-
- # 读取配置
- query_selector = self.free_config.get("query_selector", {})
- cfg_jur = query_selector.get("jurisdiction")
- cfg_loc = query_selector.get("location")
- cfg_type = query_selector.get("visa_type")
- cfg_subtype = query_selector.get("visa_subtype")
- cfg_cat = query_selector.get("appointment_category")
-
- jur_value = None
- loc_value = None
- type_value = None
- subtype_value = None
- cat_value = None
-
- resp = requests.post(
- f'{self.local_service_url}/browser/visatype_visable',
- data=html,
- headers={"Content-Type": "text/plain"},
- timeout=10
- )
- result = resp.json()
- if result.get('status') != 'success':
- raise BizLogicError(message='Broswer task failed')
-
- jur_id = result['data']['jur_id']
- loc_id = result['data']['loc_id']
- type_id = result['data']['type_id']
- subtype_id = result['data']['subtype_id']
- cat_id = result['data']['cat_id']
-
- jurisdiction_list = get_js_data("jurisdictionData")
- location_list = get_js_data("locationData")
- visa_type_list = get_js_data("visaIdData")
- visa_subtype_list = get_js_data("visasubIdData")
- app_category_list = get_js_data("AppointmentCategoryIdData")
- # 4. 匹配 Value
- # (A) Appointment Category
- for item in app_category_list:
- if item.get("Name") == cfg_cat:
- cat_value = item.get("Id")
- break
-
- # (B) Jurisdiction (如果配置了)
- if cfg_jur and jurisdiction_list:
- for item in jurisdiction_list:
- if item.get("Name") == cfg_jur:
- jur_value = item.get("Id")
- break
- # (C) Location
- for item in location_list:
- if item.get("Name") == cfg_loc:
- loc_value = item.get("Id")
- break
-
- # (D) Visa Type (需匹配 LocationId)
- if loc_value:
- for item in visa_type_list:
- # 比较 Name 和 LocationId
- if item.get("Name") == cfg_type and str(item.get("LocationId")) == str(loc_value):
- type_value = item.get("Id")
- break
-
- # (E) Visa SubType (需匹配 VisaType Value)
- if type_value:
- for item in visa_subtype_list:
- # BLS 逻辑: visasubIdData 中的 Value 字段对应 VisaTypeId
- if item.get("Name") == cfg_subtype and str(item.get("Value")) == str(type_value):
- subtype_value = item.get("Id")
- break
- # 5. 构造动态参数 & 校验
- if not cat_value:
- raise NotFoundError(message=f"Config: AppCategory '{cfg_cat}' not found")
- params[f"AppointmentCategoryId{cat_id}"] = cat_value
- if cfg_jur:
- if not jur_value:
- raise NotFoundError(message=f"Config: Jurisdiction '{cfg_jur}' not found")
- params[f"JurisdictionId{jur_id}"] = jur_value
- if not loc_value:
- raise NotFoundError(message=f"Config: Location '{cfg_loc}' not found")
- params[f"Location{loc_id}"] = loc_value
- if not type_value:
- raise NotFoundError(message=f"Config: VisaType '{cfg_type}' not found for Loc '{cfg_loc}'")
- params[f"VisaType{type_id}"] = type_value
- if not subtype_value:
- raise NotFoundError(message=f"Config: VisaSubType '{cfg_subtype}' not found")
- params[f"VisaSubType{subtype_id}"] = subtype_value
- # 固定参数
- for k in list(params.keys()):
- if k.startswith("AppointmentFor"):
- params[k] = "Individual"
-
- # 6. 构造 ResponseData (行为轨迹模拟)
- # BLS 后端会校验这个字段,模拟用户选择下拉框的时间间隔
- response_data = []
- current_time = datetime.utcnow()
-
- def add_trace(prefix, val_id):
- nonlocal current_time
- # 模拟 1-3 秒的操作间隔
- duration = random.randint(1000, 3000)
- gap = random.randint(500, 1500)
-
- start = current_time
- end = start + timedelta(milliseconds=duration)
-
- # BLS 时间格式: 2023-10-27T10:00:00.123Z
- fmt = "%Y-%m-%dT%H:%M:%S.%f"
-
- response_data.append({
- "Id": f"{prefix}{val_id}",
- "Start": start.strftime(fmt)[:-3] + "Z",
- "End": end.strftime(fmt)[:-3] + "Z",
- "Total": duration,
- "Selected": True
- })
- current_time = end + timedelta(milliseconds=gap)
- # 按顺序添加轨迹
- add_trace("AppointmentCategoryId", cat_id)
- if jur_id: add_trace("JurisdictionId", jur_id)
- add_trace("Location", loc_id)
- add_trace("VisaType", type_id)
- add_trace("VisaSubType", subtype_id)
- params["ResponseData"] = json.dumps(response_data)
- params["X-Requested-With"] = "XMLHttpRequest"
-
- return params
- def _submit_final_form(self, model_id: str, user_inputs: Dict, book_params: Dict, token: str):
- """
- 提交最终签证申请表 (VisaAppointmentForm)
- 对应原代码的: get_visa_appointment_form_html -> parse -> fix_data -> submit
- """
- domain = self.free_config.get("domain")
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
- # 1. 获取表单页面 (为了提取 JS 变量映射表)
- url_get = f"https://{domain}/Global/BlsAppointment/VisaAppointmentForm?appointmentId={model_id}"
- # 构造 Referer
- ref_query = urlencode(book_params)
- referer = f"Global/blsAppointment/ManageAppointment?{ref_query}"
-
- headers['X-Requested-With'] = "XMLHttpRequest"
- headers['Referer'] = f"https://{domain}/{referer}"
- resp = self._perform_request('GET', url_get, headers=headers)
- headers.pop['X-Requested-With']
- headers.pop['Referer']
-
- html = resp.text
- soup = BeautifulSoup(resp.text, 'html.parser')
-
- # 2. 提取基础隐藏域 (包含 __RequestVerificationToken 等)
- form_data = self._extract_hidden_fields(soup)
-
- # 3. 提取下拉菜单数据源 (JS Variables)
- # BLS 的页面里有很多 var countryData = [...]; 这种数据
- def get_list(name):
- val = self._extract_js_var(html, f"var {name}", rf"var {name}\s*=\s*(.*?);")
- return json.loads(val) if val else []
- # 提取关键数据源
- country_data = get_list("countryData")
- gender_data = get_list("genderData")
- marital_data = get_list("maritalStatusData")
- occupation_data = get_list("occupationData")
- # passport_type_data = get_list("passportTypeData") # 通常默认 Ordinary
-
- # 4. 辅助函数:根据文本找 ID
- def find_id(data_list, text_val, default=None):
- if not text_val: return default
- text_val = str(text_val).lower().strip()
- for item in data_list:
- if str(item.get("Name")).lower() == text_val:
- return item.get("Id")
- return default
- # 5. 准备日期 (YYYY-MM-DD)
- # uinfo 中的日期可能是不同格式,需统一
- def fmt_date(d_str):
- try:
- # 尝试解析常见格式
- for fmt in ["%Y-%m-%d", "%d/%m/%Y", "%d-%m-%Y"]:
- try:
- return datetime.strptime(d_str, fmt).strftime("%Y-%m-%d")
- except: pass
- except: pass
- return d_str # 原样返回 fallback
- dob = fmt_date(user_inputs.get("birthday", ""))
- ppt_issue = fmt_date(user_inputs.get("passport_issue_date", ""))
- ppt_expiry = fmt_date(user_inputs.get("passport_expiry_date", ""))
-
- # 自动计算行程日期 (如果未提供,默认一个月后)
- try:
- travel_date = (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d")
- except: travel_date = ""
- # 6. 构造申请人详细数据对象 (JSON)
- # 注意:这里的字段名必须严格匹配 BLS 后端实体定义
- applicant_detail = {
- "ApplicantSerialNo": "1",
- "ApplicantId": form_data.get("applicantId", "0"), # 从页面隐藏域提取
- "Id": form_data.get("applicantId", "0"),
- "ParentId": form_data.get("Id", model_id), # 关联的 Appointment ID
-
- # 基本信息
- "FirstName": user_inputs.get("first_name", ""),
- "SurName": user_inputs.get("last_name", ""),
- "LastName": user_inputs.get("last_name", ""),
- "SurnameAtBirth": user_inputs.get("last_name", ""), # 默认同名
- "GenderId": find_id(gender_data, user_inputs.get("gender"), "1"), # 默认 Male
- "MaritalStatusId": find_id(marital_data, user_inputs.get("marital_status", "Single"), "1"),
- "ServerDateOfBirth": dob,
-
- # 国籍/出生地
- "PlaceOfBirth": user_inputs.get("place_of_birth", "-"),
- "CountryOfBirthId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "NationalityAtBirthId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "NationalityId": find_id(country_data, user_inputs.get("nationality"), "0"),
-
- # 护照信息
- "PassportType": "Ordinary Passport", # 默认
- "PassportNo": user_inputs.get("passport_no", ""),
- "ServerPassportIssueDate": ppt_issue,
- "ServerPassportExpiryDate": ppt_expiry,
- "IssuePlace": user_inputs.get("place_of_issue", "-"),
- "IssueCountryId": find_id(country_data, user_inputs.get("nationality"), "0"),
-
- # 联系方式 (必填占位符)
- "HomeAddressLine1": "-",
- "HomeAddressCity": "-",
- "HomeAddressPostalCode": "-",
- "HomeAddressContactNumber": user_inputs.get("phone", "-"),
- "HomeAddressCountryId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "EmployerName": "-",
- "EmployerAddress": "-",
-
- # 职业
- "CurrentOccupationId": find_id(occupation_data, user_inputs.get("occupation", "Others"), "20"),
-
- # 行程信息 (部分写死为常规旅游)
- "PurposeOfJourneyId": "Tourism",
- "MemberStateDestinationId": "Spain",
- "MemberStateFirstEntryId": "Spain",
- "NumberOfEntriesRequested": "Multiple Entries",
- "IntendedStayDuration": "5",
- "ServerTravelDate": travel_date,
- "ServerIntendedDateOfArrival": travel_date,
- "ServerIntendedDateOfDeparture": travel_date, # 简化
-
- # 费用承担
- "CostCoveredById": "By the Applicant himself / herself",
- "MeansOfSupportId": "Cash",
-
- # 杂项
- "IsMinor": False,
- "IsVisaIssuedBefore": False,
- "BlsInvitingAuthority": "1", # 这里的 1 通常代表 "No" 或者特定枚举
- "PreviousFingerPrintStatus": "2", # 2 通常代表 No
-
- # 邀请人信息 (旅游通常填酒店或空)
- "InvitingAuthorityName": "-",
- "InvitingAddress": "-",
- "InvitingCity": "-",
- "InvitingEmail": "no-reply@example.com"
- }
- # 7. 更新表单数据
- # ApplicantsDetailsList 需要是一个 JSON 字符串
- form_data['ApplicantsDetailsList'] = json.dumps([applicant_detail])
-
- # 补全其他可能需要的字段
- form_data['PreviousFingerPrintStatus_0'] = "2"
- form_data['BlsInvitingAuthority_0'] = "1"
- form_data["X-Requested-With"] = "XMLHttpRequest"
- # 8. 提交
- # 注意:提交地址通常和 manage appointment 相同,或者是特定的 Save 接口
- # 根据你的原代码,是 Global/BLSAppointment/ManageAppointment
- url_post = f"https://{domain}/Global/BLSAppointment/ManageAppointment"
-
- # Headers 需要 Token
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- "Referer": f"https://{domain}/{referer}",
- "X-Requested-With": "XMLHttpRequest",
- "requestverificationtoken": token
- }
- # 这里的 form_data['params'] 逻辑在 _extract_hidden_fields 可能会有差异
- # 确保 form_data 是扁平的字典
- submit_resp = self._perform_request('POST', url_post, data=form_data, headers=headers)
-
- if submit_resp.json().get('success'):
- self._log("Final Form Submitted Successfully.")
- return True
- raise BizLogicError(message='Submit application form failed')
- def _read_otp_email(self, wait_sec: int = 60) -> str:
- """
- 读取 BLS 的 OTP 邮件
- """
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- sender = "Info@blsinternational.com"
- subject_keywords = "BLS"
- body_keywords = "verification code"
- # 设置时间起点 (UTC)
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- self._log(f"Waiting for OTP from {sender}...")
- # 轮询查收, 每 5 秒查一次
- attempts = wait_sec // 5
- for i in range(attempts):
-
- # 调用云端接口获取邮件内容
- # expiry=300 表示邮件有效搜索窗口为 5 分钟
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email,
- sender,
- recipient,
- subject_keywords,
- body_keywords,
- formatted_utc_time,
- 300
- )
- # 正则匹配 6 位数字验证码
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- otp = match.group(0)
- self._log("OTP code found: {otp}")
- return otp
-
- # 等待下一次轮询
- time.sleep(5)
- if i % 2 == 0:
- self._log("OTP not received yet, retrying...")
- # 超时处理
- raise NotFoundError(f"OTP email not found within {wait_sec}s")
-
- def _check_resp_is_session_expired_or_invalid(self, keyword, resp) -> bool:
- """
- 检测是否发生了 Session 过期
- """
- # 1. 检查最终 URL 是否包含登录页特征
- # 这里的判断依据是你提供的日志:Redirect to /Global/Account/LogIn
- if "/Account/LogIn" in resp.url or "/Account/Login" in resp.url:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- # 2. (备用) 如果 _perform_request 禁止了重定向,检查 302 Location
- if resp.status_code == 302:
- location = resp.headers.get("Location", "")
- if "/Account/LogIn" in location or "/Account/Login" in location:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- resp_text = resp.text
- if not resp_text:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- if keyword not in resp_text:
- if 'your session has expired, please login again.' in resp_text.lower():
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- def _save_http_session(self, page_url):
- """
- 提取 cookies, local_storage, 存入 VSCloudApi
- """
- cookies_dict = {}
- # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict()
- if hasattr(self.session.cookies, "get_dict"):
- cookies_dict = self.session.cookies.get_dict()
- else:
- # 方式 2: 迭代 (兼容标准 CookieJar)
- for c in self.session.cookies:
- cookies_dict[c.name] = c.value
- cookies_str = json.dumps(cookies_dict)
-
- # 简单生成 SessionID hash
- ua_str = self.user_agent or "unknown_ua"
- raw = cookies_str + ua_str + page_url
-
- session_id = hashes.Hash(hashes.SHA256(), backend=default_backend())
- session_id.update(raw.encode())
- sid = session_id.finalize().hex()
-
- proxy_str = ""
- if self.config.proxy.ip:
- proxy_str = f"{self.config.proxy.scheme}://"
- if self.config.proxy.username:
- proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@"
- proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}"
-
- return VSCloudApi.Instance().create_http_session(
- sid, cookies_str, "", ua_str, proxy_str, page_url
- )
|