import re import base64 import time import json import random import string from urllib.parse import urlparse, parse_qs, urlencode from typing import Dict, List, Optional, Any try: from curl_cffi import requests, const from bs4 import BeautifulSoup except ImportError: raise ImportError("Missing dependencies. Run: pip install curl-cffi beautifulsoup4") # 框架依赖 from vs_plg import IVSPlg, VSError # type: ignore from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus # type: ignore from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore from toolkit.vs_cloud_api import VSCloudApi # type: ignore class BlsPlugin(IVSPlg): """ BLS 签证预约插件 (精简版) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.session: Optional[requests.Session] = None # 运行时状态 self.book_params: Dict = {} self.last_error = VSError(0, "OK") self.is_healthy = True # OCR 服务地址默认值 self.ocr_service_url = "http://127.0.0.1:8085/predict/vfcode" def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config try: self.free_config = json.loads(config.free_config) if config.free_config else {} except: self.free_config = {} # 从配置中读取 OCR 服务地址,如果没有则使用默认 if self.free_config.get("ocr_service_url"): self.ocr_service_url = self.free_config["ocr_service_url"] def health_check(self) -> bool: return self.is_healthy def get_last_error(self) -> VSError: return self.last_error def _set_error(self, code: int, message: str): self.last_error = VSError(code, message) VSC_ERROR("bls_plg", "[%s] Error %d: %s", self.group_id, code, message) if code in [401, 403]: self.is_healthy = False # ========================================================================= # 1. 登录流程 (Login) # ========================================================================= def create_session(self) -> bool: VSC_INFO("bls_plg", "[%s] Creating session...", self.group_id) self.is_healthy = True # 初始化 Session self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome131", curl_options={const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.VERBOSE: False} ) domain = self.free_config.get("domain") if not domain: return False # 1.1 获取登录页 & 解析参数 url = f"https://{domain}/Global/account/login" resp = self._request("GET", url) if not resp: return False soup = BeautifulSoup(resp.text, 'html.parser') form_data = self._extract_hidden_fields(soup) # 解析动态 ID (UserId1, Password1 等) for inp in soup.find_all('input'): iid = inp.get('id', '') if 'UserId' in iid and re.search(r'\d+', iid): form_data["UserIdKey"] = iid # 暂存 Key form_data["UserId"] = re.search(r'\d+', iid).group(0) if 'Password' in iid and re.search(r'\d+', iid): form_data["PasswordKey"] = iid # 暂存 Key form_data["Password"] = re.search(r'\d+', iid).group(0) # 解析 data 参数 (用于验证码) data_val = self._extract_js_var(resp.text, "iframeOpenUrl", r"data=([^&]+)") # 1.2 处理验证码 captcha_token = self._solve_bls_captcha(data_val, 'Global/account/login') if not captcha_token: return False # 1.3 提交登录 submit_url = f"https://{domain}/Global/account/loginsubmit" payload = form_data payload["X-Requested-With"] = "XMLHttpRequest" payload["CaptchaData"] = captcha_token # 填入账号密码 if "UserIdKey" in form_data: payload[form_data["UserIdKey"]] = self.config.account.username if "PasswordKey" in form_data: payload[form_data["PasswordKey"]] = self.config.account.password login_res = self._request("POST", submit_url, data=payload, headers={"Referer": url}) if login_res and login_res.json().get('success'): VSC_INFO("bls_plg", "[%s] Login Successful", self.group_id) return True self._set_error(2000, "Login Failed") return False # ========================================================================= # 2. 查询流程 (Query) # ========================================================================= def query(self) -> VSQueryResult: res = VSQueryResult() domain = self.free_config.get("domain") if not self.session: return res # 2.1 签证类型验证 (VisaTypeVerification) url_vtv = f"https://{domain}/Global/bls/visatypeverification" resp = self._request("GET", url_vtv) if not resp: return res form_vtv = self._extract_hidden_fields(BeautifulSoup(resp.text, 'html.parser')) captcha_token = self._solve_bls_captcha(referer='Global/bls/visatypeverification') if not captcha_token: return res form_vtv['CaptchaData'] = captcha_token form_vtv["X-Requested-With"] = "XMLHttpRequest" vtv_res = self._request("POST", f"https://{domain}/Global/bls/VisaTypeVerification", data=form_vtv, headers={"Referer": url_vtv}) if not vtv_res or not vtv_res.json().get('success'): return res # 2.2 签证类型选择 (VisaType) return_url = vtv_res.json()['returnUrl'] # 包含 data=xxx data_val = re.search(r"data=([^&]+)", return_url).group(1) url_vt = f"https://{domain}/Global/bls/visatype?data={data_val}" resp_vt = self._request("GET", url_vt) if not resp_vt: return res # 这里需要极其复杂的 JS 变量提取 (JS Arrays -> Match Name -> Get ID) # 为了缩减篇幅,假设 _construct_visatype_payload 封装了这些逻辑 vt_payload = self._construct_visatype_payload(resp_vt.text, BeautifulSoup(resp_vt.text, 'html.parser')) if not vt_payload: return res vt_res = self._request("POST", f"https://{domain}/Global/bls/VisaType", data=vt_payload, headers={"Referer": url_vt}) if not vt_res or not vt_res.json().get('success'): if vt_res and not vt_res.json().get('available'): res.success = True res.availability_status = AvailabilityStatus.NoneAvailable return res # 2.3 获取预约参数 (Book Params) final_url = vt_res.json()['returnUrl'] q_params = parse_qs(urlparse(final_url).query) self.book_params = {k: v[0] for k, v in q_params.items()} # 2.4 查询日历 (ManageAppointment) url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}" resp_ma = self._request("GET", url_ma) if not resp_ma: return res avail_str = self._extract_js_var(resp_ma.text, "var availDates", r"var availDates =(.*?);") if avail_str: avail_json = json.loads(avail_str) # 提取日期 dates = [x['DateText'] for x in avail_json['ad'] if x['SingleSlotAvailable']] if dates: res.success = True res.availability_status = AvailabilityStatus.Available res.earliest_date = dates[0] for d in dates: da = VSQueryResult.DateAvailability(date=d) da.times.append(VSQueryResult.DateAvailability.TimeSlot(time="00:00", label="Available")) res.availability.append(da) else: res.success = True res.availability_status = AvailabilityStatus.NoneAvailable return res # ========================================================================= # 3. 预约流程 (Book) # ========================================================================= def book(self, slot_info: VSQueryResult) -> VSBookResult: res = VSBookResult() domain = self.free_config.get("domain") if not self.book_params: return res uinfo = self.free_config.get("user_info", {}) # 3.1 获取 Manage Page (为了 Token 和 JS 变量) url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}" resp_ma = self._request("GET", url_ma) if not resp_ma: return res ma_soup = BeautifulSoup(resp_ma.text, 'html.parser') ma_form = self._extract_hidden_fields(ma_soup) req_token = ma_form.get('__RequestVerificationToken') # 3.2 上传照片 if 'passport_image_url' in uinfo: photo_bytes = requests.get(uinfo['passport_image_url']).content boundary = "----WebKitFormBoundary" + "".join(random.choices(string.ascii_letters + string.digits, k=16)) upload_headers = { "content-type": f"multipart/form-data; boundary={boundary}", "requestverificationtoken": req_token, "x-requested-with": "XMLHttpRequest", "Referer": url_ma } body = (f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"photo.jpg\"\r\n" f"Content-Type: image/jpeg\r\n\r\n").encode("utf-8") + photo_bytes + f"\r\n--{boundary}--\r\n".encode("utf-8") up_res = self.session.post(f"https://{domain}/Global/query/UploadProfileImage", headers=upload_headers, data=body) if up_res.status_code == 200: ma_form['ApplicantPhotoId'] = up_res.json()['fileId'] # 3.3 邮箱 OTP 流程 data_val = self._extract_js_var(resp_ma.text, "win.iframeOpenUrl", r"data=([^&]+)") # 发送 OTP self._request("GET", f"https://{domain}/Global/blsappointment/SendAppointmentVerificationCode?code={data_val}", headers={"Referer": url_ma, "X-Requested-With": "XMLHttpRequest"}) # 读取 OTP (Wait 30s max) otp_code = self._read_otp_email(wait_sec=30) if not otp_code: self._set_error(3004, "OTP timeout") return res # 验证 OTP verify_payload = {"Code": otp_code, "Value": ma_form.get('EmailCode'), "Id": ma_form.get('Id')} v_res = self._request("POST", f"https://{domain}/Global/blsappointment/VerifyEmail", data=verify_payload, headers={"Referer": url_ma, "requestverificationtoken": req_token}) if not v_res or not v_res.json().get('success'): return res ma_form['EmailVerified'] = 'True' ma_form['EmailVerificationCode'] = otp_code # 3.4 锁定时间 (简单随机) target_date = slot_info.earliest_date # Query Slots in Day slot_url = f"https://{domain}/Global/blsappointment/GetAvailableSlotsByDate" # 构造复杂的 query params... 省略部分非关键参数 slot_params = { "appointmentDate": target_date, "locationId": ma_form.get("LocationId"), "categoryId": ma_form.get("AppointmentCategoryId"), "visaType": ma_form.get("VisaType"), "visaSubType": ma_form.get("VisaSubTypeId"), "applicantCount": 1, "dataSource": ma_form.get("DataSource"), "missionId": ma_form.get("MissionId") } slots_res = self._request("POST", slot_url, params=slot_params, headers={"Referer": url_ma, "requestverificationtoken": req_token}) if not slots_res: return res slots_data = sorted(slots_res.json(), key=lambda x: -x["Count"]) # 选剩余最多的 if not slots_data or slots_data[0]['Count'] <= 0: return res target_time = slots_data[0]['Name'] ma_form['ServerAppointmentDate'] = target_date ma_form['AppointmentDetailsList'] = '[]' # 这里的 key 是动态的 ID,需重新解析 ID date_id = re.search(r'AppointmentDate(\d+)', str(ma_soup)).group(1) slot_id = re.search(r'AppointmentSlot(\d+)', str(ma_soup)).group(1) ma_form[f'AppointmentDate{date_id}'] = target_date ma_form[f'AppointmentSlot{slot_id}'] = target_time # 3.5 再次验证码 & 提交 ManageAppointment captcha_token = self._solve_bls_captcha(data_val, f'Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}') if not captcha_token: return res ma_form['CaptchaData'] = captcha_token final_ma_res = self._request("POST", f"https://{domain}/Global/BLSAppointment/ManageAppointment", data=ma_form, headers={"Referer": url_ma}) if not final_ma_res: return res appt_model_id = final_ma_res.json().get('model', {}).get('Id') if not appt_model_id: return res # 3.6 填写申请表 (VisaAppointmentForm) # 获取页面 -> 解析 JS 变量 -> 映射 UserInfo -> 提交 # 这里逻辑较深,核心是映射。简化为提交一个空的 applicants JSON,实际需完整映射。 # 假设 _fill_applicant_form 做了这些工作 if self._submit_final_form(appt_model_id, uinfo, self.book_params, req_token): # 成功,返回 Liveness 链接 res.success = True res.session_id = self._generate_id() res.order_id = res.session_id res.payment_link = f"https://{domain}/Global/BlsAppointment/livenessView?id={appt_model_id}" # 将 Session 信息存入 Cloud 以便前端接管 self._save_session_to_cloud(res.session_id, res.payment_link) VSC_INFO("bls_plg", "[%s] Book Success. Liveness URL: %s", self.group_id, res.payment_link) return res # ========================================================================= # 辅助函数 (Helpers) # ========================================================================= def _request(self, method, url, **kwargs): print(f'_request {method} {url}') headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } if 'headers' in kwargs: headers.update(kwargs['headers']) kwargs['headers'] = headers try: resp = self.session.request(method, url, timeout=60, **kwargs) if resp.status_code == 401: self._set_error(401, "Unauthorized") elif resp.status_code in [403, 429]: self._set_error(resp.status_code, "Blocked") if resp.status_code == 200: return resp except Exception as e: VSC_WARN("bls_plg", f"Request Error: {e}") return None def _solve_bls_captcha(self, data='', referer='') -> Optional[str]: """ 验证码处理:获取图片 -> 调用远程 OCR 服务 -> 提交验证 """ domain = self.free_config.get("domain") url = f"https://{domain}/Global/NewCaptcha/GenerateCaptcha" if data: url = f"https://{domain}/Global/CaptchaPublic/GenerateCaptcha?data={data}" resp = self._request("GET", url, headers={"Referer": f"https://{domain}/{referer}"}) if not resp: return None soup = BeautifulSoup(resp.text, 'html.parser') # 1. 提取目标数字 target_text = soup.get_text() target_match = re.search(r'Select\s*(\d+)', target_text) if not target_match: return None target_num = target_match.group(1) selected_ids = [] # 2. 遍历图片并调用远程 OCR imgs = soup.find_all('img', class_='captcha-img') for img in imgs: src = img.get('src', '') if 'base64,' in src: b64 = src.split('base64,')[1] img_bytes = base64.b64decode(b64) # === 远程调用 Start === try: # 直接发送二进制 body ocr_resp = requests.post( self.ocr_service_url, data=img_bytes, headers={"Content-Type": "application/octet-stream"}, timeout=5 ) if ocr_resp.status_code == 200: res_json = ocr_resp.json() ocr_res = res_json.get('data', '').replace('$', '') VSC_DEBUG("bls_plg", f"OCR: {ocr_res} (Target: {target_num})") if ocr_res == target_num: selected_ids.append(img.get('id')) except Exception as e: VSC_WARN("bls_plg", f"OCR Service Failed: {e}") # === 远程调用 End === if not selected_ids: return None # 3. 提交选中结果 form = self._extract_hidden_fields(soup) form['SelectedImages'] = ",".join(selected_ids) submit_url = f"https://{domain}/Global/{'CaptchaPublic' if data else 'NewCaptcha'}/SubmitCaptcha" res = self._request("POST", submit_url, data=form, headers={"X-Requested-With": "XMLHttpRequest", "Referer": url}) if res and res.json().get('captcha'): return res.json()['captcha'] return None def _extract_hidden_fields(self, soup) -> Dict: params = {} form = soup.find("form") if form: for inp in form.find_all("input"): name = inp.get("name") if name: params[name] = inp.get("value", "") return params def _extract_js_var(self, html, context, pattern): # 简单正则提取 if context in html: match = re.search(pattern, html) if match: return match.group(1) return "" def _construct_visatype_payload(self, html, soup): # 简化版:提取 ID 逻辑。实际需根据 free_config 的 VisaType 名称匹配 JS 数组中的 ID # 这里仅展示结构,核心是利用 self.free_config['visaType'] 等去匹配 params = self._extract_hidden_fields(soup) # Helper inner function to find ID from JS array def find_id(var_name, target_name, key="Name", val_key="Id"): json_str = self._extract_js_var(html, f"var {var_name}", rf"var {var_name}\s*=\s*(.*?);") if json_str: try: data = json.loads(json_str) for item in data: if item.get(key) == target_name: return item.get(val_key) except: pass return None # 示例:Jurisdiction if self.free_config.get('jurisdiction'): jid = find_id("jurisdictionData", self.free_config['jurisdiction']) if jid: params[f'JurisdictionId{jid}'] = jid # 这里的 Key 也是动态的,BLS 特色 # ... 对 Location, VisaType, VisaSubType 重复此逻辑 ... params["X-Requested-With"] = "XMLHttpRequest" params["ResponseData"] = "[]" # 必须字段 return params def _submit_final_form(self, model_id, uinfo, book_params, token): # 1. Get Form HTML -> 2. Parse JS Data -> 3. Map uinfo -> 4. Post # 略,参考原代码 parse_application_form_excel 和 _fix_applicant_data # 这是一个纯数据映射过程 return True def _read_otp_email(self, wait_sec=30): # 轮询 Cloud API for _ in range(wait_sec // 5): time.sleep(5) # content = VSCloudApi.Instance().fetch_mail_content(...) # ... pass return "123456" # Mock def _save_session_to_cloud(self, sid, url): cookies = json.dumps(requests.utils.dict_from_cookiejar(self.session.cookies)) VSCloudApi.Instance().create_http_session(sid, cookies, "", "", "", url, {}) def _get_proxy_url(self): p = self.config.proxy if not p.ip: return "" if p.username: return f"{p.scheme}://{p.username}:{p.password}@{p.ip}:{p.port}" return f"{p.scheme}://{p.ip}:{p.port}" def _generate_id(self): return "".join(random.choices(string.ascii_letters + string.digits, k=8))