| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085 |
- import re
- import os
- import uuid
- import base64
- import time
- import json
- import shutil
- import random
- import string
- from datetime import datetime, timedelta
- from pathlib import Path
- from urllib.parse import urlparse, parse_qs, urlencode
- from typing import Dict, List, Optional, Any, Callable
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.backends import default_backend
- # 框架依赖
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, DateAvailability, TimeSlot, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.ocr_engine import PyTorchEngine
- class BlsPlugin(IVSPlg):
- """
- BLS 签证预约插件 (精简版)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- self.session: Optional[requests.Session] = None
-
- # 运行时状态
- self.book_params: Dict = {}
- self.is_healthy: bool = True
-
- # 浏览器实例
- self.page: Optional[ChromiumPage] = None
-
- # --- [核心修改] 并发隔离与资源管理 ---
- # 生成唯一实例 ID
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
- # 定义子目录:代理插件目录 & 浏览器用户数据目录
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- # 字符识别引擎
- self.ocr_engine: Optional[PyTorchEngine] = None
-
- # OCR 服务地址默认值
- self.local_service_url: str = ""
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[BlsPlugin] [{self.group_id}] {message}')
- else:
- print(f'[BlsPlugin] [{self.group_id}] {message}')
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- # 从配置中读取 OCR 服务地址,如果没有则使用默认
- if self.free_config.get("local_service_url"):
- self.local_service_url = self.free_config["local_service_url"]
-
- def keep_alive(self):
- pass
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.session is None:
- return False
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life * 60:
- self._log(f"Session expired.")
- return False
- return True
- def create_session(self):
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- co = ChromiumOptions()
- # -------------------------------------------------------------
- # [核心修复] 解决 'not enough values to unpack'
- # -------------------------------------------------------------
- # 1. 不要用 co.auto_port(),因为它依赖解析 stdout,会被 DBus 报错干扰
- # 2. 我们手动随机生成一个端口
- import random
- import socket
-
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
-
- debug_port = get_free_port()
- self._log(f"Assigned Debug Port: {debug_port}")
-
- # 3. 强制指定端口,DrissionPage 就会直接连接,不再解析日志
- co.set_local_port(debug_port)
-
- # --- [关键配置] 设置独立的用户数据目录 ---
- # 这样每个实例的 Cache, Cookies, LocalStorage 都是完全隔离的
- # 同时也防止了多进程争抢同一个 Default 文件夹导致的崩溃
- co.set_user_data_path(self.user_data_path)
-
- # --- 1. 指定浏览器路径 (适配 Docker) ---
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- # Docker 默认 /dev/shm 只有 64MB,Chromium 很容易爆内存崩溃
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
- except Exception as e:
- self._log(f"Session Create Error: {e}")
- self.cleanup()
- raise e
-
- self.ocr_engine = PyTorchEngine(self.free_config.get('ocr_model'))
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options={
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.VERBOSE: self.config.debug
- }
- )
- domain = self.free_config.get("domain")
- if not domain:
- raise NotFoundError(message="Required field [domain] in free config")
- # 1.1 获取登录页 & 解析参数
- login_url = f"https://{domain}/Global/account/login"
-
- resp = self._perform_request('GET', login_url)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Login_Page")
- soup = BeautifulSoup(resp.text, 'html.parser')
- form_data = self._extract_hidden_fields(soup)
-
- real_user = None
- real_pass = None
-
- # 解析动态 ID (UserId1, Password1 等)
- for inp in soup.find_all('input'):
- name = inp.get('name', '')
- if inp.has_attr('required'):
- if 'UserId' in name:
- real_user = name
- elif 'Password' in name:
- real_pass = name
-
- # 解析 data 参数 (用于验证码)
- data_val = self._extract_js_var(resp.text, "iframeOpenUrl", r"data=([^']+)")
-
- # 1.2 处理验证码
- captcha_token = self._solve_bls_captcha(data_val)
-
- # 1.3 提交登录
- submit_url = f"https://{domain}/Global/account/loginsubmit"
- payload = form_data
- payload["X-Requested-With"] = "XMLHttpRequest"
- payload["CaptchaData"] = captcha_token
- # 填入账号密码
- payload[real_user] = self.config.account.username
- payload[real_pass] = self.config.account.password
- login_resp = self._perform_request('POST', submit_url, data=payload)
- if not login_resp.json()['success']:
- raise BizLogicError(message='Login failed')
- self.session_create_time = time.time()
- self._log("Session created successfully.")
- # =========================================================================
- # 2. 查询流程 (Query)
- # =========================================================================
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- apt_config = self.free_config.get("apt_configs", {}).get(apt_type.routing_key)
- domain = self.free_config.get("domain")
- # 2.1 签证类型验证
- url_vtv = f"https://{domain}/Global/bls/visatypeverification"
- resp = self._perform_request('GET', url_vtv)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Visatypeverification_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- form_vtv = self._extract_hidden_fields(BeautifulSoup(resp.text, 'html.parser'))
- captcha_token = self._solve_bls_captcha()
-
- form_vtv['CaptchaData'] = captcha_token
- form_vtv["X-Requested-With"] = "XMLHttpRequest"
-
- vtv_resp = self._perform_request('POST', f"https://{domain}/Global/bls/VisaTypeVerification", data=form_vtv)
- if not vtv_resp.json()['success']:
- raise BizLogicError(message='Submit VisaTypeVerification Failed')
-
- # 2.2 签证类型选择
- return_url = vtv_resp.json()['returnUrl'] # 包含 data=xxx
- data_val = re.search(r"data=([^&]+)", return_url).group(1)
-
- url_vt = f"https://{domain}/Global/bls/visatype?data={data_val}"
-
- vt_resp = self._perform_request('GET', url_vt)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Visatype_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- # 这里需要极其复杂的 JS 变量提取 (JS Arrays -> Match Name -> Get ID)
- vt_payload = self._construct_visatype_payload(apt_config, vt_resp.text, BeautifulSoup(vt_resp.text, 'html.parser'))
-
- vt_res = self._perform_request('POST', f"https://{domain}/Global/bls/VisaType", data=vt_payload)
- if not vt_res.json()['success']:
- if not vt_res.json()['available']:
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- # 2.3 获取预约参数
- final_url = vt_res.json()['returnUrl']
- q_params = parse_qs(urlparse(final_url).query)
- self.book_params = {k: v[0] for k, v in q_params.items()}
-
- # 2.4 查询日历
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
-
- resp_ma = self._perform_request('GET', url_ma)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_ManageAppointment_Page")
- self._check_resp_is_session_expired_or_invalid('APPLICATION PROCESS', resp)
-
- avail_str = self._extract_js_var(resp_ma.text, "var availDates", r"var availDates =(.*?);")
- if avail_str:
- avail_json = json.loads(avail_str)
- # 提取日期
- dates = [x['DateText'] for x in avail_json['ad'] if x['SingleSlotAvailable']]
- if dates:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- earliest_date = dates[0]
- earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- res.earliest_date = earliest_dt
- res.availability = [
- DateAvailability(
- date=datetime.strptime(d, "%Y-%m-%d"),
- times=[],
- )
- for d in dates
- ]
- else:
- # 查询成功,但没有可用日期
- res.success = True
- res.availability_status = AvailabilityStatus.NoneAvailable
- res.availability = []
- return res
-
- raise BizLogicError(message='Query page not found required field [var availDates]')
- def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
- res = VSBookResult()
- domain = self.free_config.get("domain")
-
- # 3.1 获取 Manage Page (为了 Token 和 JS 变量)
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
-
- resp_ma = self._perform_request('GET', url_ma)
- ma_soup = BeautifulSoup(resp_ma.text, 'html.parser')
- ma_form = self._extract_hidden_fields(ma_soup)
- req_token = ma_form.get('__RequestVerificationToken')
-
- # 3.2 上传照片
- if 'passport_image_url' not in user_inputs:
- raise NotFoundError()
-
- photo_bytes = requests.get(user_inputs['passport_image_url']).content
- boundary = "----WebKitFormBoundary" + "".join(random.choices(string.ascii_letters + string.digits, k=16))
- upload_headers = {
- "content-type": f"multipart/form-data; boundary={boundary}",
- "requestverificationtoken": req_token,
- "x-requested-with": "XMLHttpRequest",
- }
- body = (f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"photo.jpg\"\r\n"
- f"Content-Type: image/jpeg\r\n\r\n").encode("utf-8") + photo_bytes + f"\r\n--{boundary}--\r\n".encode("utf-8")
-
- up_resp = self.session.post(f"https://{domain}/Global/query/UploadProfileImage", headers=upload_headers, data=body)
- if up_resp.status_code !=200:
- raise BizLogicError(message='Upload Passport Image failed')
-
- ma_form['ApplicantPhotoId'] = up_resp.json()['fileId']
- # 3.3 邮箱 OTP 流程
- data_val = self._extract_js_var(resp_ma.text, "win.iframeOpenUrl", r"data=([^&]+)")
-
- # 发送 OTP
- headers = {
- "X-Requested-With": "XMLHttpRequest"
- }
- self._perform_request('GET', f"https://{domain}/Global/blsappointment/SendAppointmentVerificationCode?code={data_val}", headers=headers)
-
- # 读取 OTP (Wait 30s max)
- otp_code = self._read_otp_email(wait_sec=30)
-
- # 验证 OTP
- verify_payload = {
- "Code": otp_code,
- "Value": ma_form.get('EmailCode'),
- "Id": ma_form.get('Id')
- }
-
- headers['requestverificationtoken'] = req_token
- v_resp = self._perform_request('POST', f"https://{domain}/Global/blsappointment/VerifyEmail", data=verify_payload, headers=headers)
- headers.pop('requestverificationtoken')
- if not v_resp.json().get('success'):
- raise BizLogicError(message='Email verification failed')
-
- ma_form['EmailVerified'] = 'True'
- ma_form['EmailVerificationCode'] = otp_code
- # 3.4 锁定时间 (简单随机)
- target_dt = slot_info.earliest_date
- target_date = target_dt.strftime("%Y-%m-%d")
- # Query Slots in Day
- slot_url = f"https://{domain}/Global/blsappointment/GetAvailableSlotsByDate"
- # 构造复杂的 query params... 省略部分非关键参数
- slot_params = {
- "appointmentDate": target_date,
- "locationId": ma_form.get("LocationId"),
- "categoryId": ma_form.get("AppointmentCategoryId"),
- "visaType": ma_form.get("VisaType"),
- "visaSubType": ma_form.get("VisaSubTypeId"),
- "applicantCount": 1,
- "dataSource": ma_form.get("DataSource"),
- "missionId": ma_form.get("MissionId")
- }
-
- headers['requestverificationtoken'] = req_token
- slots_resp = self._perform_request('POST', slot_url, params=slot_params, headers=headers)
- headers.pop('requestverificationtoken')
- slots_data = sorted(slots_resp.json(), key=lambda x: -x["Count"]) # 选剩余最多的
- if not slots_data or slots_data[0]['Count'] <= 0:
- self._log('Available slot times not found')
- res.success = False
- return res
-
- target_time = slots_data[0]['Name']
- ma_form['ServerAppointmentDate'] = target_date
- ma_form['AppointmentDetailsList'] = '[]'
- # 这里的 key 是动态的 ID,需重新解析 ID
- date_id = re.search(r'AppointmentDate(\d+)', str(ma_soup)).group(1)
- slot_id = re.search(r'AppointmentSlot(\d+)', str(ma_soup)).group(1)
- ma_form[f'AppointmentDate{date_id}'] = target_date
- ma_form[f'AppointmentSlot{slot_id}'] = target_time
- # 3.5 再次验证码 & 提交 ManageAppointment
- captcha_token = self._solve_bls_captcha(data_val)
- ma_form['CaptchaData'] = captcha_token
-
- final_ma_resp = self._perform_request('POST', f"https://{domain}/Global/BLSAppointment/ManageAppointment", data=ma_form, headers=headers)
-
- appt_model_id = final_ma_resp.json().get('model', {}).get('Id')
- if not appt_model_id:
- raise NotFoundError(message='Appointment model id not found')
- # 3.6 填写申请表 (VisaAppointmentForm)
- # 获取页面 -> 解析 JS 变量 -> 映射 UserInfo -> 提交
- # 这里逻辑较深,核心是映射。简化为提交一个空的 applicants JSON,实际需完整映射。
- # 假设 _fill_applicant_form 做了这些工作
- self._submit_final_form(appt_model_id, user_inputs, self.book_params, req_token)
- # 成功,返回 Liveness 链接
- Liveness_page = f"https://{domain}/Global/BlsAppointment/livenessView?id={appt_model_id}"
- session_data = self._save_http_session(Liveness_page)
- res.success = True
- res.account = self.config.account.username
- res.session_id = session_data['session_id']
- res.book_date = target_date
- res.book_time = target_time
- self._log(f"Book Success. Liveness URL: {res.payment_link}")
- return res
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
-
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- save_dir = "debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{timestamp}.html"
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
- self._log(f"HTML saved to: {filename}")
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- if self.config.debug:
- self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}')
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-
- def _extract_captcha_data(self, tmp_file):
- # 1. 加载文件
- html_file_path = Path(tmp_file).resolve()
- self.page.get(f'file://{html_file_path}')
- # 2. 定位主容器 (作为后续查找的基准,减少全局扫描)
- main_div = self.page.ele('#captcha-main-div', timeout=5)
- if not main_div:
- raise BizLogicError(message='Captcha main container not found')
- # --- 3. 提取提示数字 ---
- # 假设结构是 main -> div -> div[1] (header)
- # 使用相对 XPath 定位 header 区域
- header_ele = main_div.ele('xpath:./div/div[1]')
- caption_text = ""
-
- if header_ele:
- # 遍历子元素寻找可见的提示语
- for child in header_ele.children():
- # 这里的 is_displayed 检查是否有大小,is_covered 检查是否被遮挡
- if child.states.is_displayed and not child.states.is_covered:
- caption_text = child.text
- if caption_text: # 找到文本就跳出
- break
-
- # 安全提取数字
- number_match = re.search(r'\d+', caption_text)
- if not number_match:
- # 如果没找到数字,返回错误或特定的 status
- raise BizLogicError(message="No number found in caption")
-
- number = number_match.group()
- # --- 4. 提取图片 ID ---
- images_ids = []
-
- # 优化策略:直接查找所有 class 为 captcha-img 的图片元素
- # 语法: tag:img @@ class:captcha-img
- all_imgs = main_div.eles('tag:img@@class:captcha-img')
-
- for img in all_imgs:
- # 1. 检查可见性 (有尺寸且未被遮挡)
- if img.states.has_rect and not img.states.is_covered:
- # 2. 检查 src 属性
- src = img.attr('src')
- if src and src.startswith('data:image'):
- # 3. 获取父级元素的 ID (根据原逻辑,ID 在 img 的父级容器上)
- parent_id = img.parent().attr('id')
- if parent_id:
- images_ids.append(parent_id)
- data = {
- "number": number,
- "image_ids": images_ids,
- }
- return data
- def _solve_bls_captcha(self, data='') -> Optional[str]:
- """
- 验证码处理:获取图片 -> 调用远程 OCR 服务 -> 提交验证
- """
- domain = self.free_config.get("domain")
- url = f"https://{domain}/Global/NewCaptcha/GenerateCaptcha"
- if data:
- url = f"https://{domain}/Global/CaptchaPublic/GenerateCaptcha?data={data}"
- resp = self._perform_request("GET", url)
- if self.config.debug:
- self._save_debug_html(resp.text, prefix="Bls_Captcha_Page")
- self._check_resp_is_session_expired_or_invalid('Please select all boxes with number', resp)
-
- tmpfile = os.path.join(self.root_workspace, "tmp.html")
- with open(tmpfile, 'w', encoding='utf-8') as tfp:
- tfp.write(resp.text)
-
- soup = BeautifulSoup(resp.text, 'html.parser')
- extract_data = self._extract_captcha_data(tmpfile)
-
- numbers = extract_data['number']
- image_ids = extract_data['image_ids']
- selected_ids = []
- for sid in image_ids:
- div = soup.find("div", id=sid)
- img = div.find("img")
- src = img.get("src")
- base64_data = src.split("base64,", 1)[1]
- img_bytes = base64.b64decode(base64_data)
-
- ocr_output = self.ocr_engine.inference_bytes(img_bytes)
- ocr_res = ocr_output.replace('$', '')[:3]
- self._log(f'ocr captcha id={sid} result={ocr_res}, target={numbers}')
- if ocr_res == numbers:
- selected_ids.append(sid)
- if not selected_ids:
- raise BizLogicError(message='Captcha selected ids is empty')
-
- # 3. 提交选中结果
- self._log(f'select_ids={selected_ids}')
- form = self._extract_hidden_fields(soup)
- form['SelectedImages'] = ",".join(selected_ids)
- submit_url = f"https://{domain}/Global/{'CaptchaPublic' if data else 'NewCaptcha'}/SubmitCaptcha"
- headers = {
- "X-Requested-With": "XMLHttpRequest"
- }
- resp = self._perform_request('POST', submit_url, headers=headers, data=form)
- j = resp.json()
- if j.get('success'):
- if data:
- return resp.json()['captcha']
- else:
- return resp.json()['cd']
- else:
- # 存盘所有错误验证码后续进行数据分析
- self._log('Captcha Selection Invalid, Saving important data to data/bls_captcha')
- for img in soup.select("img.captcha-img"):
- src = img.get("src", "")
- if not src.startswith("data:image"):
- continue
- b64 = src.split("base64,", 1)[1]
- with open(f'data/bls_captcha/{uuid.uuid4().hex}.jpg', "wb") as fp:
- fp.write(base64.b64decode(b64))
- raise BizLogicError(message="Sovle captcha failed")
- def _extract_hidden_fields(self, soup) -> Dict:
- params = {}
- form = soup.find("form")
- if form:
- for inp in form.find_all("input"):
- name = inp.get("name")
- if name: params[name] = inp.get("value", "")
- else:
- self._log('Form element not found')
- return params
- def _extract_js_var(self, html, context, pattern):
- # 简单正则提取
- if context in html:
- match = re.search(pattern, html)
- if match: return match.group(1)
- return ""
- def _construct_visatype_payload(self, apt_config, html: str, soup: BeautifulSoup) -> Optional[Dict]:
- """
- 构造 VisaType 提交参数 (对应原代码 parse_visatype_form)
- """
- # 基础表单参数 (__RequestVerificationToken 等)
- params = self._extract_hidden_fields(soup)
-
- # 提取页面中的 JS 数据变量
- def get_js_data(var_name):
- try:
- # 匹配 var name = [...]; 结构
- pattern = f"var {var_name}\\s*=\\s*(.*?);"
- match = re.search(pattern, html, re.DOTALL)
- if match:
- return json.loads(match.group(1))
- except Exception as e:
- self._log(f"Failed to parse JS var {var_name}: {e}")
- return []
-
- # 读取配置
- cfg_jur = apt_config.get("jurisdiction")
- cfg_loc = apt_config.get("location")
- cfg_type = apt_config.get("visa_type")
- cfg_subtype = apt_config.get("visa_subtype")
- cfg_cat = apt_config.get("appointment_category")
-
- jur_value = None
- loc_value = None
- type_value = None
- subtype_value = None
- cat_value = None
-
- jur_id = None
- loc_id = None
- type_id = None
- subtype_id = None
- cat_id = None
-
- tmpfile = os.path.join(self.root_workspace, "tmp.html")
- with open(tmpfile, 'w', encoding='utf-8') as tfp:
- tfp.write(html)
- self.page.get(f'file://{tmpfile}')
- # 匹配 ID
- app_category_labels = self.page.eles(f'Appointment Category', timeout=1)
- for app_category_label in app_category_labels:
- if app_category_label.states.has_rect and app_category_label.tag == 'label':
- eid = app_category_label.after('tag:input').attr('id')
- cat_id = int(''.join(filter(str.isdigit, eid)))
- break
- jurisdiction_labels = self.page.eles(f'Jurisdiction', timeout=1)
- if jurisdiction_labels:
- for jurisdiction_label in jurisdiction_labels:
- if jurisdiction_label.states.has_rect and jurisdiction_label.tag == 'label':
- eid = jurisdiction_label.after('tag:input').attr('id')
- jur_id = int(''.join(filter(str.isdigit, eid)))
- break
- location_labels = self.page.eles(f'Location', timeout=1)
- for location_label in location_labels:
- if location_label.states.has_rect and location_label.tag == 'label':
- eid = location_label.after('tag:input', index=2).attr('id')
- loc_id = int(''.join(filter(str.isdigit, eid)))
- break
- visa_type_labels = self.page.eles(f'Visa Type', timeout=1)
- for visa_type_label in visa_type_labels:
- if visa_type_label.states.has_rect and visa_type_label.tag == 'label':
- eid = visa_type_label.after('tag:input').attr('id')
- type_id = int(''.join(filter(str.isdigit, eid)))
- break
- visa_subtype_labels = self.page.eles(f'Visa Sub Type', timeout=1)
- for visa_subtype_label in visa_subtype_labels:
- if visa_subtype_label.states.has_rect and visa_subtype_label.tag == 'label':
- eid = visa_subtype_label.after('tag:input').attr('id')
- subtype_id = int(''.join(filter(str.isdigit, eid)))
- break
-
- jurisdiction_list = get_js_data("jurisdictionData")
- location_list = get_js_data("locationData")
- visa_type_list = get_js_data("visaIdData")
- visa_subtype_list = get_js_data("visasubIdData")
- app_category_list = get_js_data("AppointmentCategoryIdData")
- # 4. 匹配 Value
- # (A) Appointment Category
- for item in app_category_list:
- if item.get("Name") == cfg_cat:
- cat_value = item.get("Id")
- break
-
- # (B) Jurisdiction (如果配置了)
- if cfg_jur and jurisdiction_list:
- for item in jurisdiction_list:
- if item.get("Name") == cfg_jur:
- jur_value = item.get("Id")
- break
- # (C) Location
- for item in location_list:
- if item.get("Name") == cfg_loc:
- loc_value = item.get("Id")
- break
-
- # (D) Visa Type (需匹配 LocationId)
- if loc_value:
- for item in visa_type_list:
- # 比较 Name 和 LocationId
- if item.get("Name") == cfg_type and str(item.get("LocationId")) == str(loc_value):
- type_value = item.get("Id")
- break
-
- # (E) Visa SubType (需匹配 VisaType Value)
- if type_value:
- for item in visa_subtype_list:
- # BLS 逻辑: visasubIdData 中的 Value 字段对应 VisaTypeId
- if item.get("Name") == cfg_subtype and str(item.get("Value")) == str(type_value):
- subtype_value = item.get("Id")
- break
- # 5. 构造动态参数 & 校验
- if not cat_value:
- raise NotFoundError(message=f"Config: AppCategory '{cfg_cat}' not found")
- params[f"AppointmentCategoryId{cat_id}"] = cat_value
- if cfg_jur:
- if not jur_value:
- raise NotFoundError(message=f"Config: Jurisdiction '{cfg_jur}' not found")
- params[f"JurisdictionId{jur_id}"] = jur_value
- if not loc_value:
- raise NotFoundError(message=f"Config: Location '{cfg_loc}' not found")
- params[f"Location{loc_id}"] = loc_value
- if not type_value:
- raise NotFoundError(message=f"Config: VisaType '{cfg_type}' not found for Loc '{cfg_loc}'")
- params[f"VisaType{type_id}"] = type_value
- if not subtype_value:
- raise NotFoundError(message=f"Config: VisaSubType '{cfg_subtype}' not found")
- params[f"VisaSubType{subtype_id}"] = subtype_value
- # 固定参数
- for k in list(params.keys()):
- if k.startswith("AppointmentFor"):
- params[k] = "Individual"
-
- # 6. 构造 ResponseData (行为轨迹模拟)
- # BLS 后端会校验这个字段,模拟用户选择下拉框的时间间隔
- response_data = []
- current_time = datetime.utcnow()
-
- def add_trace(prefix, val_id):
- nonlocal current_time
- # 模拟 1-3 秒的操作间隔
- duration = random.randint(1000, 3000)
- gap = random.randint(500, 1500)
-
- start = current_time
- end = start + timedelta(milliseconds=duration)
-
- # BLS 时间格式: 2023-10-27T10:00:00.123Z
- fmt = "%Y-%m-%dT%H:%M:%S.%f"
-
- response_data.append({
- "Id": f"{prefix}{val_id}",
- "Start": start.strftime(fmt)[:-3] + "Z",
- "End": end.strftime(fmt)[:-3] + "Z",
- "Total": duration,
- "Selected": True
- })
- current_time = end + timedelta(milliseconds=gap)
- # 按顺序添加轨迹
- add_trace("AppointmentCategoryId", cat_id)
- if jur_id: add_trace("JurisdictionId", jur_id)
- add_trace("Location", loc_id)
- add_trace("VisaType", type_id)
- add_trace("VisaSubType", subtype_id)
- params["ResponseData"] = json.dumps(response_data)
- params["X-Requested-With"] = "XMLHttpRequest"
-
- return params
- def _submit_final_form(self, model_id: str, user_inputs: Dict, book_params: Dict, token: str):
- """
- 提交最终签证申请表 (VisaAppointmentForm)
- 对应原代码的: get_visa_appointment_form_html -> parse -> fix_data -> submit
- """
- domain = self.free_config.get("domain")
- # 1. 获取表单页面 (为了提取 JS 变量映射表)
- url_get = f"https://{domain}/Global/BlsAppointment/VisaAppointmentForm?appointmentId={model_id}"
- # 构造 Referer
- ref_query = urlencode(book_params)
- referer = f"Global/blsAppointment/ManageAppointment?{ref_query}"
-
- headers = {
- 'X-Requested-With': "XMLHttpRequest"
- }
- resp = self._perform_request('GET', url_get, headers=headers)
- headers.pop['X-Requested-With']
-
- html = resp.text
- soup = BeautifulSoup(resp.text, 'html.parser')
-
- # 2. 提取基础隐藏域 (包含 __RequestVerificationToken 等)
- form_data = self._extract_hidden_fields(soup)
-
- # 3. 提取下拉菜单数据源 (JS Variables)
- # BLS 的页面里有很多 var countryData = [...]; 这种数据
- def get_list(name):
- val = self._extract_js_var(html, f"var {name}", rf"var {name}\s*=\s*(.*?);")
- return json.loads(val) if val else []
- # 提取关键数据源
- country_data = get_list("countryData")
- gender_data = get_list("genderData")
- marital_data = get_list("maritalStatusData")
- occupation_data = get_list("occupationData")
- # passport_type_data = get_list("passportTypeData") # 通常默认 Ordinary
-
- # 4. 辅助函数:根据文本找 ID
- def find_id(data_list, text_val, default=None):
- if not text_val: return default
- text_val = str(text_val).lower().strip()
- for item in data_list:
- if str(item.get("Name")).lower() == text_val:
- return item.get("Id")
- return default
- # 5. 准备日期 (YYYY-MM-DD)
- # uinfo 中的日期可能是不同格式,需统一
- def fmt_date(d_str):
- try:
- # 尝试解析常见格式
- for fmt in ["%Y-%m-%d", "%d/%m/%Y", "%d-%m-%Y"]:
- try:
- return datetime.strptime(d_str, fmt).strftime("%Y-%m-%d")
- except: pass
- except: pass
- return d_str # 原样返回 fallback
- dob = fmt_date(user_inputs.get("birthday", ""))
- ppt_issue = fmt_date(user_inputs.get("passport_issue_date", ""))
- ppt_expiry = fmt_date(user_inputs.get("passport_expiry_date", ""))
-
- # 自动计算行程日期 (如果未提供,默认一个月后)
- try:
- travel_date = (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d")
- except: travel_date = ""
- # 6. 构造申请人详细数据对象 (JSON)
- # 注意:这里的字段名必须严格匹配 BLS 后端实体定义
- applicant_detail = {
- "ApplicantSerialNo": "1",
- "ApplicantId": form_data.get("applicantId", "0"), # 从页面隐藏域提取
- "Id": form_data.get("applicantId", "0"),
- "ParentId": form_data.get("Id", model_id), # 关联的 Appointment ID
-
- # 基本信息
- "FirstName": user_inputs.get("first_name", ""),
- "SurName": user_inputs.get("last_name", ""),
- "LastName": user_inputs.get("last_name", ""),
- "SurnameAtBirth": user_inputs.get("last_name", ""), # 默认同名
- "GenderId": find_id(gender_data, user_inputs.get("gender"), "1"), # 默认 Male
- "MaritalStatusId": find_id(marital_data, user_inputs.get("marital_status", "Single"), "1"),
- "ServerDateOfBirth": dob,
-
- # 国籍/出生地
- "PlaceOfBirth": user_inputs.get("place_of_birth", "-"),
- "CountryOfBirthId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "NationalityAtBirthId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "NationalityId": find_id(country_data, user_inputs.get("nationality"), "0"),
-
- # 护照信息
- "PassportType": "Ordinary Passport", # 默认
- "PassportNo": user_inputs.get("passport_no", ""),
- "ServerPassportIssueDate": ppt_issue,
- "ServerPassportExpiryDate": ppt_expiry,
- "IssuePlace": user_inputs.get("place_of_issue", "-"),
- "IssueCountryId": find_id(country_data, user_inputs.get("nationality"), "0"),
-
- # 联系方式 (必填占位符)
- "HomeAddressLine1": "-",
- "HomeAddressCity": "-",
- "HomeAddressPostalCode": "-",
- "HomeAddressContactNumber": user_inputs.get("phone", "-"),
- "HomeAddressCountryId": find_id(country_data, user_inputs.get("nationality"), "0"),
- "EmployerName": "-",
- "EmployerAddress": "-",
-
- # 职业
- "CurrentOccupationId": find_id(occupation_data, user_inputs.get("occupation", "Others"), "20"),
-
- # 行程信息 (部分写死为常规旅游)
- "PurposeOfJourneyId": "Tourism",
- "MemberStateDestinationId": "Spain",
- "MemberStateFirstEntryId": "Spain",
- "NumberOfEntriesRequested": "Multiple Entries",
- "IntendedStayDuration": "5",
- "ServerTravelDate": travel_date,
- "ServerIntendedDateOfArrival": travel_date,
- "ServerIntendedDateOfDeparture": travel_date, # 简化
-
- # 费用承担
- "CostCoveredById": "By the Applicant himself / herself",
- "MeansOfSupportId": "Cash",
-
- # 杂项
- "IsMinor": False,
- "IsVisaIssuedBefore": False,
- "BlsInvitingAuthority": "1", # 这里的 1 通常代表 "No" 或者特定枚举
- "PreviousFingerPrintStatus": "2", # 2 通常代表 No
-
- # 邀请人信息 (旅游通常填酒店或空)
- "InvitingAuthorityName": "-",
- "InvitingAddress": "-",
- "InvitingCity": "-",
- "InvitingEmail": "no-reply@example.com"
- }
- # 7. 更新表单数据
- # ApplicantsDetailsList 需要是一个 JSON 字符串
- form_data['ApplicantsDetailsList'] = json.dumps([applicant_detail])
-
- # 补全其他可能需要的字段
- form_data['PreviousFingerPrintStatus_0'] = "2"
- form_data['BlsInvitingAuthority_0'] = "1"
- form_data["X-Requested-With"] = "XMLHttpRequest"
- # 8. 提交
- # 注意:提交地址通常和 manage appointment 相同,或者是特定的 Save 接口
- # 根据你的原代码,是 Global/BLSAppointment/ManageAppointment
- url_post = f"https://{domain}/Global/BLSAppointment/ManageAppointment"
-
- # Headers 需要 Token
- headers = {
- "Referer": f"https://{domain}/{referer}",
- "X-Requested-With": "XMLHttpRequest",
- "requestverificationtoken": token
- }
- # 这里的 form_data['params'] 逻辑在 _extract_hidden_fields 可能会有差异
- # 确保 form_data 是扁平的字典
- submit_resp = self._perform_request('POST', url_post, data=form_data, headers=headers)
-
- if submit_resp.json().get('success'):
- self._log("Final Form Submitted Successfully.")
- return True
- raise BizLogicError(message='Submit application form failed')
- def _read_otp_email(self, wait_sec: int = 60) -> str:
- """
- 读取 BLS 的 OTP 邮件
- """
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- sender = "Info@blsinternational.com"
- subject_keywords = "BLS"
- body_keywords = "verification code"
- # 设置时间起点 (UTC)
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- self._log(f"Waiting for OTP from {sender}...")
- # 轮询查收, 每 5 秒查一次
- attempts = wait_sec // 5
- for i in range(attempts):
-
- # 调用云端接口获取邮件内容
- # expiry=300 表示邮件有效搜索窗口为 5 分钟
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email,
- sender,
- recipient,
- subject_keywords,
- body_keywords,
- formatted_utc_time,
- 300
- )
- # 正则匹配 6 位数字验证码
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- otp = match.group(0)
- self._log("OTP code found: {otp}")
- return otp
-
- # 等待下一次轮询
- time.sleep(5)
- if i % 2 == 0:
- self._log("OTP not received yet, retrying...")
- # 超时处理
- raise NotFoundError(f"OTP email not found within {wait_sec}s")
-
- def _check_resp_is_session_expired_or_invalid(self, keyword, resp) -> bool:
- """
- 检测是否发生了 Session 过期
- """
- # 1. 检查最终 URL 是否包含登录页特征
- # 这里的判断依据是你提供的日志:Redirect to /Global/Account/LogIn
- if "/Account/LogIn" in resp.url or "/Account/Login" in resp.url:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- # 2. (备用) 如果 _perform_request 禁止了重定向,检查 302 Location
- if resp.status_code == 302:
- location = resp.headers.get("Location", "")
- if "/Account/LogIn" in location or "/Account/Login" in location:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- resp_text = resp.text
- if not resp_text:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- if keyword not in resp_text:
- if 'your session has expired, please login again.' in resp_text.lower():
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- def _save_http_session(self, page_url):
- """
- 提取 cookies, local_storage, 存入 VSCloudApi
- """
- cookies_dict = {}
- # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict()
- if hasattr(self.session.cookies, "get_dict"):
- cookies_dict = self.session.cookies.get_dict()
- else:
- # 方式 2: 迭代 (兼容标准 CookieJar)
- for c in self.session.cookies:
- cookies_dict[c.name] = c.value
- cookies_str = json.dumps(cookies_dict)
-
- # 简单生成 SessionID hash
- ua_str = self.user_agent or "unknown_ua"
- raw = cookies_str + ua_str + page_url
-
- session_id = hashes.Hash(hashes.SHA256(), backend=default_backend())
- session_id.update(raw.encode())
- sid = session_id.finalize().hex()
-
- proxy_str = ""
- if self.config.proxy.ip:
- proxy_str = f"{self.config.proxy.scheme}://"
- if self.config.proxy.username:
- proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@"
- proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}"
-
- return VSCloudApi.Instance().create_http_session(
- sid, cookies_str, "", ua_str, proxy_str, page_url
- )
-
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- # 1. 关闭浏览器
- if self.page:
- try:
- self.page.quit() # 这会关闭 Chrome 进程
- except Exception:
- pass # 忽略已关闭的错误
- self.page = None
-
- # 2. 删除文件
- # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- # 如果删除失败(通常是Windows文件占用),重试
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
-
- # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
-
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
|