import re import os import base64 import time import json import random import string from datetime import datetime, timedelta from pathlib import Path from urllib.parse import urlparse, parse_qs, urlencode from typing import Dict, List, Optional, Any from curl_cffi import requests, const from bs4 import BeautifulSoup from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend # 框架依赖 from vs_plg import IVSPlg, VSError from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN from toolkit.vs_cloud_api import VSCloudApi from utils.browser_util import get_browser class BlsPlugin(IVSPlg): """ BLS 签证预约插件 (精简版) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.session: Optional[requests.Session] = None # 运行时状态 self.book_params: Dict = {} self.last_error = VSError(0, "OK") self.is_healthy = True # OCR 服务地址默认值 self.ocr_service_url = "http://127.0.0.1:8085/predict/vfcode?model=pytorch" self.browser = get_browser() def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config try: self.free_config = json.loads(config.free_config) if config.free_config else {} except: self.free_config = {} # 从配置中读取 OCR 服务地址,如果没有则使用默认 if self.free_config.get("ocr_service_url"): self.ocr_service_url = self.free_config["ocr_service_url"] def health_check(self) -> bool: return self.is_healthy def create_session(self): self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome131", curl_options={ const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.VERBOSE: False } ) domain = self.free_config.get("domain") if not domain: raise NotFoundError(message="Required field [domain] in free config") # 1.1 获取登录页 & 解析参数 login_url = f"https://{domain}/Global/account/login" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } resp = self._perform_request('GET', login_url, headers=headers) soup = BeautifulSoup(resp.text, 'html.parser') form_data = self._extract_hidden_fields(soup) # 解析动态 ID (UserId1, Password1 等) for inp in soup.find_all('input'): iid = inp.get('id', '') if 'UserId' in iid and re.search(r'\d+', iid): form_data["UserIdKey"] = iid # 暂存 Key form_data["UserId"] = re.search(r'\d+', iid).group(0) if 'Password' in iid and re.search(r'\d+', iid): form_data["PasswordKey"] = iid # 暂存 Key form_data["Password"] = re.search(r'\d+', iid).group(0) # 解析 data 参数 (用于验证码) data_val = self._extract_js_var(resp.text, "iframeOpenUrl", r"data=([^']+)") # 1.2 处理验证码 captcha_token = self._solve_bls_captcha(data_val) # 1.3 提交登录 submit_url = f"https://{domain}/Global/account/loginsubmit" payload = form_data payload["X-Requested-With"] = "XMLHttpRequest" payload["CaptchaData"] = captcha_token # 填入账号密码 if "UserIdKey" in form_data: payload[form_data["UserIdKey"]] = self.config.account.username if "PasswordKey" in form_data: payload[form_data["PasswordKey"]] = self.config.account.password login_resp = self._perform_request('POST', submit_url, data=payload, headers=headers) if login_resp.json()['success']: return raise BizLogicError(message='Login failed') # ========================================================================= # 2. 查询流程 (Query) # ========================================================================= def query(self) -> VSQueryResult: res = VSQueryResult() domain = self.free_config.get("domain") # 2.1 签证类型验证 url_vtv = f"https://{domain}/Global/bls/visatypeverification" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } resp = self._perform_request('GET', url_vtv, headers=headers) form_vtv = self._extract_hidden_fields(BeautifulSoup(resp.text, 'html.parser')) captcha_token = self._solve_bls_captcha() form_vtv['CaptchaData'] = captcha_token form_vtv["X-Requested-With"] = "XMLHttpRequest" vtv_resp = self._perform_request('POST', f"https://{domain}/Global/bls/VisaTypeVerification", data=form_vtv, headers=headers) if not vtv_resp.json()['success']: raise BizLogicError(message='Submit VisaTypeVerification Failed') # 2.2 签证类型选择 return_url = vtv_resp.json()['returnUrl'] # 包含 data=xxx data_val = re.search(r"data=([^&]+)", return_url).group(1) url_vt = f"https://{domain}/Global/bls/visatype?data={data_val}" vt_resp = self._perform_request('GET', url_vt, headers=headers) # 这里需要极其复杂的 JS 变量提取 (JS Arrays -> Match Name -> Get ID) vt_payload = self._construct_visatype_payload(vt_resp.text, BeautifulSoup(vt_resp.text, 'html.parser')) vt_res = self._perform_request('POST', f"https://{domain}/Global/bls/VisaType", data=vt_payload, headers=headers) if not vt_res.json()['success']: if not vt_res.json()['available']: res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res # 2.3 获取预约参数 final_url = vt_res.json()['returnUrl'] q_params = parse_qs(urlparse(final_url).query) self.book_params = {k: v[0] for k, v in q_params.items()} # 2.4 查询日历 url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}" resp_ma = self._perform_request('GET', url_ma, headers=headers) avail_str = self._extract_js_var(resp_ma.text, "var availDates", r"var availDates =(.*?);") if avail_str: avail_json = json.loads(avail_str) # 提取日期 dates = [x['DateText'] for x in avail_json['ad'] if x['SingleSlotAvailable']] if dates: res.success = True res.availability_status = AvailabilityStatus.Available res.earliest_date = dates[0] for d in dates: da = VSQueryResult.DateAvailability(date=d) da.times.append(VSQueryResult.DateAvailability.TimeSlot(time="00:00", label="Available")) res.availability.append(da) else: res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res raise BizLogicError(message='Query page not found required field [var availDates]') def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult: res = VSBookResult() domain = self.free_config.get("domain") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } # 3.1 获取 Manage Page (为了 Token 和 JS 变量) url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}" resp_ma = self._perform_request('GET', url_ma, headers=headers) ma_soup = BeautifulSoup(resp_ma.text, 'html.parser') ma_form = self._extract_hidden_fields(ma_soup) req_token = ma_form.get('__RequestVerificationToken') # 3.2 上传照片 if 'passport_image_url' not in user_inputs: raise NotFoundError() photo_bytes = requests.get(user_inputs['passport_image_url']).content boundary = "----WebKitFormBoundary" + "".join(random.choices(string.ascii_letters + string.digits, k=16)) upload_headers = { "content-type": f"multipart/form-data; boundary={boundary}", "requestverificationtoken": req_token, "x-requested-with": "XMLHttpRequest", } body = (f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"photo.jpg\"\r\n" f"Content-Type: image/jpeg\r\n\r\n").encode("utf-8") + photo_bytes + f"\r\n--{boundary}--\r\n".encode("utf-8") up_resp = self.session.post(f"https://{domain}/Global/query/UploadProfileImage", headers=upload_headers, data=body) if up_resp.status_code !=200: raise BizLogicError(message='Upload Passport Image failed') ma_form['ApplicantPhotoId'] = up_resp.json()['fileId'] # 3.3 邮箱 OTP 流程 data_val = self._extract_js_var(resp_ma.text, "win.iframeOpenUrl", r"data=([^&]+)") # 发送 OTP headers["X-Requested-With"] = "XMLHttpRequest" self._perform_request('GET', f"https://{domain}/Global/blsappointment/SendAppointmentVerificationCode?code={data_val}", headers=headers) # 读取 OTP (Wait 30s max) otp_code = self._read_otp_email(wait_sec=30) # 验证 OTP verify_payload = {"Code": otp_code, "Value": ma_form.get('EmailCode'), "Id": ma_form.get('Id')} headers['requestverificationtoken'] = req_token v_resp = self._perform_request('POST', f"https://{domain}/Global/blsappointment/VerifyEmail", data=verify_payload, headers=headers) headers.pop('requestverificationtoken') if not v_resp.json().get('success'): raise BizLogicError(message='Email verification failed') ma_form['EmailVerified'] = 'True' ma_form['EmailVerificationCode'] = otp_code # 3.4 锁定时间 (简单随机) target_date = slot_info.earliest_date # Query Slots in Day slot_url = f"https://{domain}/Global/blsappointment/GetAvailableSlotsByDate" # 构造复杂的 query params... 省略部分非关键参数 slot_params = { "appointmentDate": target_date, "locationId": ma_form.get("LocationId"), "categoryId": ma_form.get("AppointmentCategoryId"), "visaType": ma_form.get("VisaType"), "visaSubType": ma_form.get("VisaSubTypeId"), "applicantCount": 1, "dataSource": ma_form.get("DataSource"), "missionId": ma_form.get("MissionId") } headers['requestverificationtoken'] = req_token slots_resp = self._perform_request('POST', slot_url, params=slot_params, headers=headers) headers.pop('requestverificationtoken') slots_data = sorted(slots_resp.json(), key=lambda x: -x["Count"]) # 选剩余最多的 if not slots_data or slots_data[0]['Count'] <= 0: VSC_WARN('bls', 'Available slot times not found') res.success = False return res target_time = slots_data[0]['Name'] ma_form['ServerAppointmentDate'] = target_date ma_form['AppointmentDetailsList'] = '[]' # 这里的 key 是动态的 ID,需重新解析 ID date_id = re.search(r'AppointmentDate(\d+)', str(ma_soup)).group(1) slot_id = re.search(r'AppointmentSlot(\d+)', str(ma_soup)).group(1) ma_form[f'AppointmentDate{date_id}'] = target_date ma_form[f'AppointmentSlot{slot_id}'] = target_time # 3.5 再次验证码 & 提交 ManageAppointment captcha_token = self._solve_bls_captcha(data_val) ma_form['CaptchaData'] = captcha_token final_ma_resp = self._perform_request('POST', f"https://{domain}/Global/BLSAppointment/ManageAppointment", data=ma_form, headers=headers) appt_model_id = final_ma_resp.json().get('model', {}).get('Id') if not appt_model_id: raise NotFoundError(message='Appointment model id not found') # 3.6 填写申请表 (VisaAppointmentForm) # 获取页面 -> 解析 JS 变量 -> 映射 UserInfo -> 提交 # 这里逻辑较深,核心是映射。简化为提交一个空的 applicants JSON,实际需完整映射。 # 假设 _fill_applicant_form 做了这些工作 self._submit_final_form(appt_model_id, user_inputs, self.book_params, req_token) # 成功,返回 Liveness 链接 Liveness_page = f"https://{domain}/Global/BlsAppointment/livenessView?id={appt_model_id}" session_data = self._save_http_session(Liveness_page) res.success = True res.account = self.config.account.username res.session_id = session_data['session_id'] res.book_date = target_date res.book_time = target_time VSC_INFO("bls_plg", "[%s] Book Success. Liveness URL: %s", self.group_id, res.payment_link) return res def _save_debug_html(self, content: str, prefix: str = "debug"): save_dir = "debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{self.group_id}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) VSC_INFO("bls_plg", "[%s] HTML saved to: %s", self.group_id, filename) def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ print(f'[perform request] {method} {url}') resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) VSC_INFO('bls_plg', resp.text) if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _solve_bls_captcha(self, data='') -> Optional[str]: """ 验证码处理:获取图片 -> 调用远程 OCR 服务 -> 提交验证 """ domain = self.free_config.get("domain") url = f"https://{domain}/Global/NewCaptcha/GenerateCaptcha" if data: url = f"https://{domain}/Global/CaptchaPublic/GenerateCaptcha?data={data}" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } resp = self._perform_request("GET", url, headers=headers) with open("tmp.html", 'w') as f: f.write(resp.text) selected_ids = [] html_file_path = Path("tmp.html").resolve() file_url = f'file://{html_file_path}' self.browser.get(file_url) captions_ele = self.browser.ele('xpath://*[@id="captcha-main-div"]/div/div[1]', timeout=5) if not captions_ele: raise NotFoundError(message='Captions elements not found') caption_eles = captions_ele.children() caption_text = '' for caption in caption_eles: if not caption.states.is_covered: caption_text = caption.text numbers = re.findall(r'\d+', caption_text)[0] captcha_images_ele = self.browser.ele('xpath://*[@id="captcha-main-div"]/div/div[2]') captcha_image_eles = captcha_images_ele.children() rect_dict = {} for captcha_image in captcha_image_eles: img = captcha_image.ele('.captcha-img') if img.states.has_rect: rect_dict[img._backend_id] = img.states.has_rect for captcha_image in captcha_image_eles: img = captcha_image.ele('.captcha-img') if img.states.has_rect and img.states.is_covered == False: img_src = img.attr('src') if img_src and img_src.startswith('data:image'): base64_data = re.sub('^data:image/.+;base64,', '', img_src) img_bytes = base64.b64decode(base64_data) ocr_resp = requests.post( self.ocr_service_url, data=img_bytes, headers={"Content-Type": "application/octet-stream"}, timeout=5 ) if ocr_resp.status_code == 200: res_json = ocr_resp.json() ocr_res = res_json.get('data', '').replace('$', '')[:3] VSC_INFO("bls_plg", f'ocr captcha id={captcha_image.attr("id")} result={ocr_res}, target={numbers}') if ocr_res == numbers: eid = captcha_image.attr('id') selected_ids.append(eid) else: raise BizLogicError(message='Captcha server response error') if not selected_ids: raise BizLogicError(message='Captcha selected ids is empty') VSC_INFO("bls_plg", f'select_ids={selected_ids}') soup = BeautifulSoup(resp.text, 'html.parser') # 3. 提交选中结果 form = self._extract_hidden_fields(soup) form['SelectedImages'] = ",".join(selected_ids) submit_url = f"https://{domain}/Global/{'CaptchaPublic' if data else 'NewCaptcha'}/SubmitCaptcha" headers["X-Requested-With"] = "XMLHttpRequest" resp = self._perform_request('POST', submit_url, headers=headers, data=form) return resp.json()['captcha'] def _extract_hidden_fields(self, soup) -> Dict: params = {} form = soup.find("form") if form: for inp in form.find_all("input"): name = inp.get("name") if name: params[name] = inp.get("value", "") else: VSC_WARN('bls_plg', 'Form element not found') return params def _extract_js_var(self, html, context, pattern): # 简单正则提取 if context in html: match = re.search(pattern, html) if match: return match.group(1) return "" def _construct_visatype_payload(self, html: str, soup: BeautifulSoup) -> Optional[Dict]: """ 构造 VisaType 提交参数 (对应原代码 parse_visatype_form) """ # 1. 基础表单参数 (__RequestVerificationToken 等) params = self._extract_hidden_fields(soup) # 2. 提取页面中的 JS 数据变量 def get_js_data(var_name): try: # 匹配 var name = [...]; 结构 pattern = f"var {var_name}\\s*=\\s*(.*?);" match = re.search(pattern, html, re.DOTALL) if match: return json.loads(match.group(1)) except Exception as e: VSC_DEBUG("bls_plg", f"Failed to parse JS var {var_name}: {e}") return [] jurisdiction_list = get_js_data("jurisdictionData") location_list = get_js_data("locationData") visa_type_list = get_js_data("visaIdData") visa_subtype_list = get_js_data("visasubIdData") app_category_list = get_js_data("AppointmentCategoryIdData") # 3. 读取配置 cfg_jur = self.free_config.get("jurisdiction") cfg_loc = self.free_config.get("location") cfg_type = self.free_config.get("visaType") cfg_subtype = self.free_config.get("visaSubType") cfg_cat = self.free_config.get("appointmentCategory", "Normal") # 4. 匹配 ID jur_id = None loc_id = None type_id = None subtype_id = None cat_id = None # (A) Appointment Category for item in app_category_list: if item.get("Name") == cfg_cat: cat_id = item.get("Id") break # (B) Jurisdiction (如果配置了) if cfg_jur and jurisdiction_list: for item in jurisdiction_list: if item.get("Name") == cfg_jur: jur_id = item.get("Id") break # (C) Location for item in location_list: if item.get("Name") == cfg_loc: loc_id = item.get("Id") break # (D) Visa Type (需匹配 LocationId) if loc_id: for item in visa_type_list: # 比较 Name 和 LocationId if item.get("Name") == cfg_type and str(item.get("LocationId")) == str(loc_id): type_id = item.get("Id") break # (E) Visa SubType (需匹配 VisaType Value) if type_id: for item in visa_subtype_list: # BLS 逻辑: visasubIdData 中的 Value 字段对应 VisaTypeId if item.get("Name") == cfg_subtype and str(item.get("Value")) == str(type_id): subtype_id = item.get("Id") break # 5. 构造动态参数 & 校验 if not cat_id: raise NotFoundError(message=f"Config: AppCategory '{cfg_cat}' not found") params[f"AppointmentCategoryId{cat_id}"] = cat_id if cfg_jur: if not jur_id: raise NotFoundError(message=f"Config: Jurisdiction '{cfg_jur}' not found") params[f"JurisdictionId{jur_id}"] = jur_id if not loc_id: raise NotFoundError(message=f"Config: Location '{cfg_loc}' not found") params[f"Location{loc_id}"] = loc_id if not type_id: raise NotFoundError(message=f"Config: VisaType '{cfg_type}' not found for Loc '{cfg_loc}'") params[f"VisaType{type_id}"] = type_id if not subtype_id: raise NotFoundError(message=f"Config: VisaSubType '{cfg_subtype}' not found") params[f"VisaSubType{subtype_id}"] = subtype_id # 固定参数 params["AppointmentFor1"] = "Individual" # 6. 构造 ResponseData (行为轨迹模拟) # BLS 后端会校验这个字段,模拟用户选择下拉框的时间间隔 response_data = [] current_time = datetime.utcnow() def add_trace(prefix, val_id): nonlocal current_time # 模拟 1-3 秒的操作间隔 duration = random.randint(1000, 3000) gap = random.randint(500, 1500) start = current_time end = start + timedelta(milliseconds=duration) # BLS 时间格式: 2023-10-27T10:00:00.123Z fmt = "%Y-%m-%dT%H:%M:%S.%f" response_data.append({ "Id": f"{prefix}{val_id}", "Start": start.strftime(fmt)[:-3] + "Z", "End": end.strftime(fmt)[:-3] + "Z", "Total": duration, "Selected": True }) current_time = end + timedelta(milliseconds=gap) # 按顺序添加轨迹 add_trace("AppointmentCategoryId", cat_id) if jur_id: add_trace("JurisdictionId", jur_id) add_trace("Location", loc_id) add_trace("VisaType", type_id) add_trace("VisaSubType", subtype_id) params["ResponseData"] = json.dumps(response_data) params["X-Requested-With"] = "XMLHttpRequest" return params def _submit_final_form(self, model_id: str, user_inputs: Dict, book_params: Dict, token: str): """ 提交最终签证申请表 (VisaAppointmentForm) 对应原代码的: get_visa_appointment_form_html -> parse -> fix_data -> submit """ domain = self.free_config.get("domain") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } # 1. 获取表单页面 (为了提取 JS 变量映射表) url_get = f"https://{domain}/Global/BlsAppointment/VisaAppointmentForm?appointmentId={model_id}" # 构造 Referer ref_query = urlencode(book_params) referer = f"Global/blsAppointment/ManageAppointment?{ref_query}" headers['X-Requested-With'] = "XMLHttpRequest" headers['Referer'] = f"https://{domain}/{referer}" resp = self._perform_request('GET', url_get, headers=headers) headers.pop['X-Requested-With'] headers.pop['Referer'] html = resp.text soup = BeautifulSoup(resp.text, 'html.parser') # 2. 提取基础隐藏域 (包含 __RequestVerificationToken 等) form_data = self._extract_hidden_fields(soup) # 3. 提取下拉菜单数据源 (JS Variables) # BLS 的页面里有很多 var countryData = [...]; 这种数据 def get_list(name): val = self._extract_js_var(html, f"var {name}", rf"var {name}\s*=\s*(.*?);") return json.loads(val) if val else [] # 提取关键数据源 country_data = get_list("countryData") gender_data = get_list("genderData") marital_data = get_list("maritalStatusData") occupation_data = get_list("occupationData") # passport_type_data = get_list("passportTypeData") # 通常默认 Ordinary # 4. 辅助函数:根据文本找 ID def find_id(data_list, text_val, default=None): if not text_val: return default text_val = str(text_val).lower().strip() for item in data_list: if str(item.get("Name")).lower() == text_val: return item.get("Id") return default # 5. 准备日期 (YYYY-MM-DD) # uinfo 中的日期可能是不同格式,需统一 def fmt_date(d_str): try: # 尝试解析常见格式 for fmt in ["%Y-%m-%d", "%d/%m/%Y", "%d-%m-%Y"]: try: return datetime.strptime(d_str, fmt).strftime("%Y-%m-%d") except: pass except: pass return d_str # 原样返回 fallback dob = fmt_date(user_inputs.get("birthday", "")) ppt_issue = fmt_date(user_inputs.get("passport_issue_date", "")) ppt_expiry = fmt_date(user_inputs.get("passport_expiry_date", "")) # 自动计算行程日期 (如果未提供,默认一个月后) try: travel_date = (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d") except: travel_date = "" # 6. 构造申请人详细数据对象 (JSON) # 注意:这里的字段名必须严格匹配 BLS 后端实体定义 applicant_detail = { "ApplicantSerialNo": "1", "ApplicantId": form_data.get("applicantId", "0"), # 从页面隐藏域提取 "Id": form_data.get("applicantId", "0"), "ParentId": form_data.get("Id", model_id), # 关联的 Appointment ID # 基本信息 "FirstName": user_inputs.get("first_name", ""), "SurName": user_inputs.get("last_name", ""), "LastName": user_inputs.get("last_name", ""), "SurnameAtBirth": user_inputs.get("last_name", ""), # 默认同名 "GenderId": find_id(gender_data, user_inputs.get("gender"), "1"), # 默认 Male "MaritalStatusId": find_id(marital_data, user_inputs.get("marital_status", "Single"), "1"), "ServerDateOfBirth": dob, # 国籍/出生地 "PlaceOfBirth": user_inputs.get("place_of_birth", "-"), "CountryOfBirthId": find_id(country_data, user_inputs.get("nationality"), "0"), "NationalityAtBirthId": find_id(country_data, user_inputs.get("nationality"), "0"), "NationalityId": find_id(country_data, user_inputs.get("nationality"), "0"), # 护照信息 "PassportType": "Ordinary Passport", # 默认 "PassportNo": user_inputs.get("passport_no", ""), "ServerPassportIssueDate": ppt_issue, "ServerPassportExpiryDate": ppt_expiry, "IssuePlace": user_inputs.get("place_of_issue", "-"), "IssueCountryId": find_id(country_data, user_inputs.get("nationality"), "0"), # 联系方式 (必填占位符) "HomeAddressLine1": "-", "HomeAddressCity": "-", "HomeAddressPostalCode": "-", "HomeAddressContactNumber": user_inputs.get("phone", "-"), "HomeAddressCountryId": find_id(country_data, user_inputs.get("nationality"), "0"), "EmployerName": "-", "EmployerAddress": "-", # 职业 "CurrentOccupationId": find_id(occupation_data, user_inputs.get("occupation", "Others"), "20"), # 行程信息 (部分写死为常规旅游) "PurposeOfJourneyId": "Tourism", "MemberStateDestinationId": "Spain", "MemberStateFirstEntryId": "Spain", "NumberOfEntriesRequested": "Multiple Entries", "IntendedStayDuration": "5", "ServerTravelDate": travel_date, "ServerIntendedDateOfArrival": travel_date, "ServerIntendedDateOfDeparture": travel_date, # 简化 # 费用承担 "CostCoveredById": "By the Applicant himself / herself", "MeansOfSupportId": "Cash", # 杂项 "IsMinor": False, "IsVisaIssuedBefore": False, "BlsInvitingAuthority": "1", # 这里的 1 通常代表 "No" 或者特定枚举 "PreviousFingerPrintStatus": "2", # 2 通常代表 No # 邀请人信息 (旅游通常填酒店或空) "InvitingAuthorityName": "-", "InvitingAddress": "-", "InvitingCity": "-", "InvitingEmail": "no-reply@example.com" } # 7. 更新表单数据 # ApplicantsDetailsList 需要是一个 JSON 字符串 form_data['ApplicantsDetailsList'] = json.dumps([applicant_detail]) # 补全其他可能需要的字段 form_data['PreviousFingerPrintStatus_0'] = "2" form_data['BlsInvitingAuthority_0'] = "1" form_data["X-Requested-With"] = "XMLHttpRequest" # 8. 提交 # 注意:提交地址通常和 manage appointment 相同,或者是特定的 Save 接口 # 根据你的原代码,是 Global/BLSAppointment/ManageAppointment url_post = f"https://{domain}/Global/BLSAppointment/ManageAppointment" # Headers 需要 Token headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' "Referer": f"https://{domain}/{referer}", "X-Requested-With": "XMLHttpRequest", "requestverificationtoken": token } # 这里的 form_data['params'] 逻辑在 _extract_hidden_fields 可能会有差异 # 确保 form_data 是扁平的字典 submit_resp = self._perform_request('POST', url_post, data=form_data, headers=headers) if submit_resp.json().get('success'): VSC_INFO("bls_plg", "[%s] Final Form Submitted Successfully.", self.group_id) return True raise BizLogicError(message='Submit application form failed') def _read_otp_email(self, wait_sec: int = 60) -> str: """ 读取 BLS 的 OTP 邮件 """ master_email = "visafly666@gmail.com" recipient = self.config.account.username sender = "Info@blsinternational.com" subject_keywords = "BLS" body_keywords = "verification code" # 设置时间起点 (UTC) now_utc = datetime.utcnow() formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S") VSC_INFO("bls_plg", "[%s] Waiting for OTP from %s...", self.group_id, sender) # 轮询查收, 每 5 秒查一次 attempts = wait_sec // 5 for i in range(attempts): # 调用云端接口获取邮件内容 # expiry=300 表示邮件有效搜索窗口为 5 分钟 content_out = VSCloudApi.Instance().fetch_mail_content( master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300 ) # 正则匹配 6 位数字验证码 match = re.search(r'\b\d{6}\b', content_out) if match: otp = match.group(0) VSC_INFO("bls_plg", "[%s] OTP code found: %s", self.group_id, otp) return otp # 等待下一次轮询 time.sleep(5) if i % 2 == 0: VSC_DEBUG("bls_plg", "[%s] OTP not received yet, retrying...", self.group_id) # 超时处理 raise NotFoundError(f"OTP email not found within {wait_sec}s") def _save_http_session(self, page_url): """ 提取 cookies, local_storage, 存入 VSCloudApi """ cookies_dict = {} # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict() if hasattr(self.session.cookies, "get_dict"): cookies_dict = self.session.cookies.get_dict() else: # 方式 2: 迭代 (兼容标准 CookieJar) for c in self.session.cookies: cookies_dict[c.name] = c.value cookies_str = json.dumps(cookies_dict) # 简单生成 SessionID hash ua_str = self.user_agent or "unknown_ua" raw = cookies_str + ua_str + page_url session_id = hashes.Hash(hashes.SHA256(), backend=default_backend()) session_id.update(raw.encode()) sid = session_id.finalize().hex() proxy_str = "" if self.config.proxy.ip: proxy_str = f"{self.config.proxy.scheme}://" if self.config.proxy.username: proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@" proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}" return VSCloudApi.Instance().create_http_session( sid, cookies_str, "", ua_str, proxy_str, page_url )