# plugins/vfs_plugin2.py import os import time import json import random import base64 import re import urllib.parse from datetime import datetime from typing import Dict, Any, Optional, List, Tuple, Callable # DrissionPage 核心引入 from DrissionPage import ChromiumPage, ChromiumOptions from DrissionPage.common import Settings # 加密库 from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend from vs_plg import IVSPlg from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, DateAvailability, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi # ----------------- 静态常量与辅助数据 ----------------- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2 GQIDAQAB -----END PUBLIC KEY-----""" # (Country Map 省略以节省篇幅,请保持原样) COUNTRY_MAP = { "afghanistan": "AFG", "albania": "ALB", "algeria": "DZA", "andorra": "AND", "angola": "AGO", "china": "CHN", "united kingdom": "GBR", "netherlands": "NLD", # ... 请保留你原来的完整映射 ... } def get_country_iso3(name: str) -> str: return COUNTRY_MAP.get(name.lower(), "CHN") def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"): try: dt = datetime.strptime(data_str, date_str_format) return dt.strftime(target_format) except: return data_str def create_proxy_auth_extension(ip, port, username, password, plugin_path="./chrome_proxy_auth_plugin"): """ 创建一个 Chrome 插件来自动处理代理认证 """ if not os.path.exists(plugin_path): os.makedirs(plugin_path) # 1. manifest.json manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Chrome Proxy Auth Extension", "permissions": [ "proxy", "tabs", "unlimitedStorage", "storage", "", "webRequest", "webRequestBlocking" ], "background": { "scripts": ["background.js"] }, "minimum_chrome_version": "22.0.0" } """ # 2. background.js background_js = f""" var config = {{ mode: "fixed_servers", rules: {{ singleProxy: {{ scheme: "http", host: "{ip}", port: parseInt({port}) }}, bypassList: ["localhost"] }} }}; chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}}); function callbackFn(details) {{ return {{ authCredentials: {{ username: "{username}", password: "{password}" }} }}; }} chrome.webRequest.onAuthRequired.addListener( callbackFn, {{urls: [""]}}, ['blocking'] ); """ with open(os.path.join(plugin_path, "manifest.json"), "w") as f: f.write(manifest_json) with open(os.path.join(plugin_path, "background.js"), "w") as f: f.write(background_js) return os.path.abspath(plugin_path) # --- 模拟 Requests Response 对象 --- class BrowserResponse: def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json @property def content(self): return self.text.encode('utf-8') class VfsPlugin2(IVSPlg): def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.logger = None # 替换 requests.Session 为 DrissionPage self.page: Optional[ChromiumPage] = None self.jwt_token: str = "" self.real_ip: str = "" self.is_healthy: bool = True self.center_conf = None self.category_conf: Dict = {} self.subcategory_conf: Dict = {} self.public_key = serialization.load_pem_public_key( VFS_PUBLIC_KEY_PEM.encode(), backend=default_backend() ) self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def _log(self, message): if self.logger: self.logger(f'[VfsPlugin] [{self.group_id}] {message}') else: print(f'[VfsPlugin] [{self.group_id}] {message}') def health_check(self) -> bool: if not self.is_healthy: return False if self.page is None: return False # 检查页面是否还活着 try: if not self.page.run_js("return 1;"): return False except: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session expired.") return False return True def create_session(self) -> None: """ 使用 DrissionPage 创建会话: 1. 启动浏览器 2. 导航到登录页 3. 自动过盾并提取 Token 4. JS fetch 登录 """ self._log("Initializing Browser Session...") # 0. 配置浏览器 co = ChromiumOptions() co.auto_port() # 自动分配端口 if self.config.proxy and self.config.proxy.ip: p = self.config.proxy # 情况 A: 有账号密码 -> 使用插件方案 if p.username and p.password: self._log(f"Configuring authenticated proxy: {p.ip}:{p.port}") plugin_path = create_proxy_auth_extension( ip=p.ip, port=p.port, username=p.username, password=p.password ) co.add_extension(plugin_path) # 情况 B: 无账号密码 (IP白名单模式) -> 直接设置 else: self._log(f"Configuring standard proxy: {p.ip}:{p.port}") co.set_proxy(f"{p.scheme}://{p.ip}:{p.port}") # 无头模式 (生产环境建议 True, 调试 False) # co.headless(True) co.headless(False) # 调试时设为 False 方便观察 # 反爬参数 co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--window-size=1920,1080') # 禁用自动化特征 co.set_argument('--disable-blink-features=AutomationControlled') try: self.page = ChromiumPage(co) # 1. 导航到登录页面 (建立 Context) mission = self.free_config.get("mission_code", "") country = self.free_config.get("country_code", "") lang = self.free_config.get("language", "en") if not mission or not country: raise BizLogicError("Missing mission/country code config") login_page_url = f"https://visa.vfsglobal.com/{country}/{lang}/{mission}/login" self._log(f"Navigating to {login_page_url}...") self.page.get(login_page_url) # 2. 等待 Cloudflare 验证通过 # DrissionPage 会自动处理 Turnstile,我们只需要等待结果出现 # 通常 CF 的 widget 会生成一个 hidden input name="cf-turnstile-response" self._log("Waiting for Cloudflare challenge...") # 最多等待 30 秒 cf_token = "" for _ in range(10): # 间隔 1 秒 time.sleep(1) self._handle_cookie_banner() # 尝试从 DOM 获取 Token try: # 检查是否有 cf-turnstile-response 元素且有值 ele = self.page.ele('xpath://input[@name="cf-turnstile-response"]') if ele and ele.value: cf_token = ele.value self._log("Cloudflare Turnstile token extracted from DOM.") break except: pass # 也可以检查是否已经看到了登录框 (id="mat-input-0" 或 form) if self.page.ele('xpath://form'): self._log("Login form detected.") # 即使 form 出来了,有时候 token 还在生成,稍微再等一下 # 如果没拿到 token,尝试直接继续,或者报错 # 注意:有些 VFS 页面可能没有显式的 turnstile,而是隐式的 if not cf_token: self._log("[WARN] Could not extract Turnstile token. Trying to proceed anyway...") # 3. 准备登录 API 参数 email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) client_src = self._get_client_source() orange_src = self._get_orange_source(email) url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src, # DrissionPage fetch 不需要 content-type,json参数会自动加 }) data = { "username": email, "password": enc_password, "missioncode": mission, "countrycode": country, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": cf_token # 填入提取到的 Token } self._log("Sending Login Request via Browser Fetch...") resp = self._perform_request("POST", url, headers=headers, json_data=data) resp_json = resp.json() # 分支 1: 登录成功 if resp_json.get('accessToken'): self.jwt_token = resp_json["accessToken"] self._log("Login successful, JWT obtained.") # 分支 2: OTP elif resp_json.get("enableOTPAuthentication"): self._log("Login requires OTP.") otp = self._read_otp_email() self._submit_login_otp(cf_token, otp) else: raise BizLogicError(f"Login failed: {resp.text[:200]}") self.session_create_time = time.time() # 获取真实IP (用于日志) try: self.real_ip = self._get_realnetwork_ip() except: self.real_ip = "0.0.0.0" except Exception as e: self._log(f"Create Session Failed: {e}") if self.page: self.page.quit() self.page = None raise e def query(self) -> VSQueryResult: """查询可预约 Slot""" result = VSQueryResult() appt_types = self.free_config.get("appointment_types", []) if not appt_types: raise NotFoundError(message="No matching appointment configuration found.") apt_config = random.choice(appt_types) try: self._fetch_configurations(apt_config) earliest_date = self._query_earliest_slot(apt_config) result.success = False result.availability_status = AvailabilityStatus.NoneAvailable result.visa_type = apt_config.get("visa_type", "") result.city = apt_config.get("city", "") if earliest_date: result.success = True if "WaitList" in earliest_date: result.availability_status = AvailabilityStatus.Waitlist else: result.availability_status = AvailabilityStatus.Available result.earliest_date = earliest_date result.availability = [DateAvailability(date=earliest_date, times=[])] self._log(f"Slot Found! Date: {earliest_date}") else: self._log("No slots available.") except Exception as e: self._log(f"Query Error: {e}") raise e return result def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 核心方法:在 DrissionPage 浏览器上下文中注入 JS 执行 fetch """ if not self.page: raise BizLogicError("Browser session not initialized") # 1. 确保在正确的上下文 (VFS 登录页或 API 域名) # create_session 已经打开了页面,这里通常不需要额外跳转 # 如果页面崩溃或跳转了,可能需要恢复 # 2. 构造参数 if params: if '?' in url: url += '&' + urllib.parse.urlencode(params) else: url += '?' + urllib.parse.urlencode(params) fetch_options = { "method": method.upper(), "headers": headers or {}, "credentials": "include" # 关键:带上浏览器 Cookie } if json_data: fetch_options['body'] = json.dumps(json_data) fetch_options['headers']['Content-Type'] = 'application/json' elif data: if isinstance(data, dict): fetch_options['body'] = urllib.parse.urlencode(data) fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded' else: fetch_options['body'] = data # 3. 注入 JS js_script = f""" const url = "{url}"; const options = {json.dumps(fetch_options)}; return fetch(url, options) .then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}) .catch(error => {{ return {{ status: 0, body: error.toString(), headers: {{}}, url: url }}; }}); """ if self.config.debug: self._log(f"[Browser Fetch] {method} {url}") try: # run_js 直接返回 return 的对象 res_dict = self.page.run_js(js_script, timeout=30) except Exception as e: raise BizLogicError(f"Browser JS Execution Error: {e}") resp = BrowserResponse(res_dict) # 4. 统一处理状态码 if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError(f"401 Unauthorized: {resp.text[:100]}") elif resp.status_code == 403: raise PermissionDeniedError(f"403 Forbidden: {resp.text[:100]}") elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError(f"429 Rate Limit: {resp.text[:100]}") elif resp.status_code == 0: raise BizLogicError(f"Network Error (Fetch Failed): {resp.text}") else: # 允许 400 业务错误通过,交给上层解析 (例如登录失败) if url.endswith("/login") and resp.status_code == 400: return resp raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _handle_cookie_banner(self): """ 处理 OneTrust Cookie 遮挡 策略:尝试点击“接受所有”,如果点不到就直接移除 DOM """ try: # 使用 JS 处理最快,且不会因为元素运动报错 js = """ try { // 1. 尝试点击 '接受所有' 按钮 var acceptBtn = document.getElementById('onetrust-accept-btn-handler'); if (acceptBtn) { acceptBtn.click(); return true; } // 2. 如果没有按钮,或者还在遮挡,直接把整个 banner 删掉 var banner = document.getElementById('onetrust-banner-sdk'); if (banner) { banner.style.display = 'none'; // 隐藏 banner.remove(); // 或者移除 return true; } } catch(e) {} return false; """ self.page.run_js(js) except: pass def _get_proxy_url(self): if self.config.proxy and self.config.proxy.ip: s = self.config.proxy if s.username: return f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: return f"{s.scheme}://{s.ip}:{s.port}" return None def _get_realnetwork_ip(self): """ 通过新建标签页获取 IP 解决 CORS 403 问题:新标签页请求属于 Top-Level Navigation, 不带 Origin: visa.vfsglobal.com,也不带 credentials,符合 ipify 规则。 """ try: # 1. 新建一个标签页 (后台静默打开) tab = self.page.new_tab("https://api.ipify.org/?format=json") # 2. 获取页面内容 (DrissionPage 会自动等待页面加载) # ipify 返回的是纯 JSON 文本,通常在 body 或 pre 标签里 if tab.ele('tag:pre'): json_text = tab.ele('tag:pre').text else: json_text = tab.ele('tag:body').text # 3. 提取 IP ip = json.loads(json_text)['ip'] # 4. 务必关闭标签页,释放资源 tab.close() self._log(f"Real Network IP: {ip}") return ip except Exception as e: self._log(f"[WARN] Failed to check IP via new tab: {e}") # 尝试清理可能没关掉的标签页 try: if self.page.tabs_count > 1: tab.close() except: pass return "0.0.0.0" def _get_common_headers(self, with_auth=True) -> Dict[str, str]: # DrissionPage 浏览器会自动带上 Origin, Referer, User-Agent, Sec-CH-UA 等 # 这里只需要补充业务特定的 Headers mission = self.free_config.get("mission_code", "") country = self.free_config.get("country_code", "") lang = self.free_config.get("language", "en") route = f"{country}/{lang}/{mission}" h = { "accept": "application/json, text/plain, */*", # "origin": ... 浏览器自动处理 # "referer": ... 浏览器自动处理 "route": route } # 即使是浏览器环境,VFS 也需要这两个加密参数 # 注意:这里可能需要从 JS 获取,或者保持 Python 生成 # 如果 Python 生成的总是报错,可以考虑把加密逻辑移到 JS 里跑 h["clientsource"] = self._get_client_source() if with_auth and self.jwt_token: h["authorize"] = self.jwt_token return h def _encrypt_password(self, password: str) -> str: ciphertext = self.public_key.encrypt( password.encode(), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return base64.b64encode(ciphertext).decode() def _get_orange_source(self, email: str) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"{email};{timestamp}" return self._encrypt_password(payload) def _get_client_source(self) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"GA;{timestamp}Z" return self._encrypt_password(payload) def _query_earliest_slot(self, apt_config) -> Optional[str]: url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable" data = { "missioncode": self.free_config.get("mission_code"), "countrycode": self.free_config.get("country_code"), "vacCode": apt_config.get("vac_code"), "visaCategoryCode": apt_config.get("subcategory_code"), "roleName": "Individual", "loginUser": self.config.account.username, "payCode": "" } headers = self._get_common_headers(with_auth=True) # fetch 不需要显式 content-type application/json,json_data会自动处理 # DrissionPage 不需要手动处理 403 绕盾,因为浏览器本身就在盾后面 resp = self._perform_request("POST", url, headers=headers, json_data=data) if "WaitList" in resp.text: return "WaitList" j = resp.json() if j.get("earliestSlotLists"): raw_date = j["earliestSlotLists"][0]["date"] return to_yyyymmdd(raw_date, "%m/%d/%Y %H:%M:%S") return "" def _fetch_configurations(self, apt_config: Dict[str, Any]): if not self.center_conf: self.center_conf = self._query_center() vac_code = apt_config.get("vac_code") category_code = apt_config.get("category_code") if category_code not in self.category_conf: visa_categories = self._query_visa_category(vac_code) found = False for vc in visa_categories: if vc.get("code") == category_code: self.category_conf[category_code] = vc found = True break if not found: self._log(f"WARN: Category {category_code} not found") sub_category_code = apt_config.get("subcategory_code") if sub_category_code not in self.subcategory_conf: visa_subcategories = self._query_visa_sub_category(vac_code, category_code) found = False for svc in visa_subcategories: if svc.get("code") == sub_category_code: self.subcategory_conf[sub_category_code] = svc found = True break if not found: self._log(f"WARN: SubCategory {sub_category_code} not found") def _query_center(self) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _query_visa_category(self, center_code: str) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") enc_center = urllib.parse.quote(center_code) url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _query_visa_sub_category(self, center_code: str, category_code: str) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") enc_center = urllib.parse.quote(center_code) enc_cat = urllib.parse.quote(category_code) url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _read_otp_email(self) -> str: # 保持原样,这部分使用云API读取邮件,不依赖本地网络库 master_email = "visafly666@gmail.com" recipient = self.config.account.username sender = "donotreply at vfshelpline.com" subject_keywords = "One Time Password" body_keywords = "OTP" now_utc = datetime.utcnow() formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S") self._log(f"Waiting for OTP email...") for i in range(12): content_out = VSCloudApi.Instance().fetch_mail_content( master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300 ) if content_out: match = re.search(r'\b\d{6}\b', content_out) if match: return match.group(0) time.sleep(5) raise NotFoundError(message="OTP email not found") def _submit_login_otp(self, old_cf_token: str, otp: str): self._log("Submitting Login OTP...") # --- [新增] 必须刷新 Token --- # 旧的 old_cf_token 已经在第一步登录时失效了 new_cf_token = self._refresh_turnstile_token() # --------------------------- email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) mission = self.free_config.get("mission_code", "") country = self.free_config.get("country_code", "") client_src = self._get_client_source() orange_src = self._get_orange_source(email) url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src }) data = { "username": email, "password": enc_password, "missioncode": mission, "countrycode": country, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": new_cf_token, # <--- 使用新 Token "otp": otp } resp = self._perform_request("POST", url, headers=headers, json_data=data) resp_json = resp.json() if resp_json.get("accessToken"): self.jwt_token = resp_json["accessToken"] self._log("OTP Login successful.") return # 增加错误详情日志 error_desc = resp_json.get("description", resp.text) raise PermissionDeniedError(message=f"OTP Login Failed: {error_desc}") def _refresh_turnstile_token(self) -> str: """ 强制刷新 Cloudflare Turnstile 并获取新 Token (增强版) """ self._log("Refreshing Cloudflare Turnstile token...") # 1. JS 强制重置 # 加上 try-catch 防止页面没有 turnstile 对象导致崩溃 js_reset = """ try { var input = document.querySelector('input[name="cf-turnstile-response"]'); if (input) input.value = ""; window.turnstile.reset(); } catch(e) { console.log("Turnstile reset error:", e); } """ self.page.run_js(js_reset) # 2. 轮询等待 (增加到 30 秒) # 策略:检测 Token -> 如果没有且有 iframe -> 点击 iframe 触发验证 for i in range(60): # 60 * 0.5s = 30s time.sleep(0.5) # A. 尝试直接获取 Token (使用 JS 获取更稳定) token = self.page.run_js('return document.querySelector("input[name=\'cf-turnstile-response\']")?.value') if token: self._log("Turnstile token refreshed successfully.") return token # B. 如果等待了 3 秒还没结果,尝试寻找 iframe 并点击 # Cloudflare 有时需要用户点一下 "Verify you are human" if i > 6 and (i % 5 == 0): # 每隔 2.5 秒尝试点一次 try: # 查找包含 turnstile 或 cloudflare 的 iframe # VFS 页面通常只有一个 cf_iframe = self.page.ele('xpath://iframe[contains(@src, "turnstile") or contains(@src, "cloudflare")]') if cf_iframe: # 尝试点击 iframe 的中心位置 # self._log("Clicking Cloudflare widget to activate...") cf_iframe.click(by_js=True) except Exception: pass # 如果超时,为了调试,打印一下当前页面源码的一部分或截图(可选) raise BizLogicError("Failed to refresh Cloudflare Turnstile token (Timeout)") # ------------------------------------------------------------- # 核心预约逻辑 (DrissionPage 版) # ------------------------------------------------------------- def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult: """ 执行完整的预约流程 """ self._log("Starting booking process...") # 1. 准备数据 user_email = user_inputs.get('email') # 生成别名邮箱 (防止邮箱被 VFS 黑名单) user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com") res = VSBookResult() slot_routing_key = slot_info.routing_key # 如果没有 earliest_date,默认从今天开始 from_date = slot_info.earliest_date if slot_info.earliest_date else datetime.now().strftime("%Y-%m-%d") # 2. 查找对应的配置 apt_config = None appt_types = self.free_config.get("appointment_types", []) for apt in appt_types: if apt.get("routing_key") == slot_routing_key: apt_config = apt break if not apt_config: raise NotFoundError(message="Book: Config missing for this routing key.") # 确保配置已加载 (SubCategory 等) self._fetch_configurations(apt_config) sub_cc = apt_config.get("subcategory_code") sub_conf = self.subcategory_conf.get(sub_cc, {}) # 3. OCR 识别 / 文档上传 (如果需要) # 上传结果存入 user_inputs 供后续使用 ocr_enabled = sub_conf.get("isOCREnable", False) if ocr_enabled: self._log("OCR Enabled, uploading documents...") upload_res = self._upload_applicant_documents(apt_config, user_inputs) user_inputs["applicant_image"] = upload_res.get("passportImageFilename") user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes") user_inputs["guid"] = upload_res.get("uploadDocumentGUID") enable_reference_number = sub_conf.get("enableReferenceNumber", False) # 4. 添加申请人 (核心步骤 1) final_urn = None is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist) # 重试机制:添加申请人有时候会因为并发冲突失败 MAX_RETRY = 3 for i in range(MAX_RETRY): try: final_urn = self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number) if final_urn: break except Exception as e: self._log(f"Add Applicant retry {i+1}/{MAX_RETRY}: {e}") time.sleep(2) if not final_urn: raise BizLogicError(message="Failed to add primary applicant (Slot likely taken or API error)") self._log(f"Applicant Added. URN: {final_urn}") # 5. 申请人 OTP 验证 (核心步骤 2 - 视配置而定) otp_enabled = sub_conf.get("isApplicantOTPEnabled", False) if otp_enabled: self._log("Applicant OTP Required.") if not self._applicant_otp_send(apt_config, final_urn): raise BizLogicError(message='Applicant OTP send failed') # 复用之前的读邮件逻辑 otp_code = self._read_otp_email() if not self._applicant_otp_verify(apt_config, final_urn, otp_code): raise BizLogicError(message='Applicant OTP verify failed') # 6. Waitlist 模式直接返回 if is_waitlist: if self._confirm_waitlist(apt_config, final_urn): res.success = True res.urn = final_urn self._log("Waitlist confirmed.") return res raise BizLogicError(message='Confirm waitlist failed') # 7. 寻找具体的时间槽 (核心步骤 3) expected_start = user_inputs.get("expected_start_date", "") expected_end = user_inputs.get("expected_end_date", "") # 计算需要扫描的月份 months = self._get_filtered_covered_months(expected_start, expected_end, from_date) self._log(f"Scanning months: {months} (Start looking from: {from_date})") selected_slot_id = "" selected_slot_date = "" selected_slot_time_range = "" all_ads = set() forbidden_dates = set() found_slot = False for m_str in months: self._log(f"Checking calendar for {m_str}...") # 查询日历 ads = self._query_slot_calendar(apt_config, final_urn, m_str) # 去重 new_ads = [d for d in ads if d not in all_ads] all_ads.update(new_ads) # 尝试选中一个日期 # 这里做一个简单循环,如果选中日期没时间了,就换一个日期 for _ in range(3): avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates] # 根据用户期望过滤 sel_dates = self._filter_dates(avail_candidates, expected_start, expected_end) if not sel_dates: break # 当前月没有符合要求的日期,去下一个月 tmp_date = sel_dates[0] # 取第一个(通常 _filter_dates 里已经 shuffle 过了) forbidden_dates.add(tmp_date) # 标记为已尝试 # 关键:Audit Log (锁定日期) # VFS 要求在查 timeslot 之前必须先发这个请求 if not self._saveuseractionaudit(apt_config, final_urn, tmp_date): self._log(f"Audit failed for {tmp_date}, skipping...") time.sleep(1) continue # 查询具体时间 ats = self._query_slot_time(apt_config, final_urn, tmp_date) if not ats: self._log(f"No timeslots for {tmp_date}") continue # 随机选一个时间 sel_tm = random.choice(ats) selected_slot_id = sel_tm.get("allocationId") selected_slot_date = tmp_date selected_slot_time_range = sel_tm.get("slot") found_slot = True break if found_slot: break if not found_slot: self._log("No valid slots found after scanning.") res.success = False return res self._log(f"Slot Selected: {selected_slot_date} {selected_slot_time_range} (ID: {selected_slot_id})") # 8. 服务与费用 (核心步骤 4) self._submit_no_addition_service(final_urn) amount, currency = self._query_fee(apt_config, final_urn) # 9. 最终提交 self._log("Submitting schedule...") schedule_res = self._schedule(apt_config, final_urn, amount, currency, selected_slot_id) if not schedule_res.get("IsAppointmentBooked"): self._log(f"Booking failed: {schedule_res}") res.success = False return res # 10. 构造成功结果 res.success = True res.account = self.config.account.username res.book_date = selected_slot_date res.book_time = selected_slot_time_range res.urn = final_urn res.fee_amount = int(amount * 100) res.fee_currency = currency # 11. 处理支付链接 if schedule_res.get("IsPaymentRequired", False): payload = schedule_res.get("payLoad", "") if payload: self._log("Processing payment link...") payment_url = self._pay_request(payload) if payment_url: res.payment_link = payment_url return res # ------------------------------------------------------------- # 辅助方法实现 (DrissionPage 适配版) # ------------------------------------------------------------- def _upload_applicant_documents(self, apt_config, user_inputs) -> Dict: """上传图片:先下载外部图片,再通过浏览器上传到 VFS""" import requests as standard_requests # 使用标准库下载外部资源 url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument" passport_url = user_inputs.get("passport_image_url") if not passport_url: raise NotFoundError(message="Missing passport_image_url") # 下载图片 (不走代理或走系统代理,不使用 DrissionPage,因为是外部链接) try: img_resp = standard_requests.get(passport_url, timeout=30) if img_resp.status_code != 200: raise BizLogicError(message=f"Failed to download passport image: {img_resp.status_code}") b64_str = base64.b64encode(img_resp.content).decode('utf-8') except Exception as e: raise BizLogicError(message=f"Image download error: {e}") headers = self._get_common_headers(with_auth=True) # DrissionPage fetch 不需要显式 content-type application/json,json_data会自动处理 data = { "missioncode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "languageCode": "en-US", "visaCategoryCode": apt_config.get("subcategory_code"), "fileBytes": b64_str, "selfiImageFileBytes": "" } resp = self._perform_request("POST", url, headers=headers, json_data=data) result = resp.json() # 补充返回数据供后续使用 result["passportImageFilename"] = "passport_img.jpg" result["passportImageFileBytes"] = b64_str return result def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any], is_waitlist: bool, ocr_enabled: bool, enable_ref: bool) -> str: """构造申请人 payload 并提交""" url = "https://lift-api.vfsglobal.com/appointment/applicants" headers = self._get_common_headers(with_auth=True) gender_str = str(user_inputs.get("gender", "")).lower() gender_code = 1 if gender_str == "male" else 2 raw_dial = user_inputs.get("phone_country_code", "86") dial_code = str(raw_dial) # 日期格式转换 YYYY-MM-DD -> DD/MM/YYYY def _to_ddmmyyyy(d_str): try: return datetime.strptime(str(d_str), "%Y-%m-%d").strftime("%d/%m/%Y") except: return str(d_str) dob = _to_ddmmyyyy(user_inputs.get("birthday", "")) ppt_exp = _to_ddmmyyyy(user_inputs.get("passport_expiry_date", "")) applicant = { "urn": "", "arn": "", "loginUser": self.config.account.username, "firstName": str(user_inputs.get("first_name", "")).upper(), "middleName": "", "lastName": str(user_inputs.get("last_name", "")).upper(), "employerFirstName": "", "employerLastName": "", "salutation": "", "gender": gender_code, "contactNumber": str(user_inputs.get("phone", "")), "dialCode": dial_code, "employerContactNumber": "", "employerDialCode": "", "emailId": str(user_inputs.get("alias_email", "")).upper(), "employerEmailId": "", "passportNumber": str(user_inputs.get("passport_no", "")).upper(), "confirmPassportNumber": "", "passportExpirtyDate": ppt_exp, "dateOfBirth": dob, "nationalId": None, "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))), "state": None, "city": None, "addressline1": None, "addressline2": None, "pincode": None, "isEndorsedChild": False, "applicantType": 0, "vlnNumber": None, "applicantGroupId": 0, "parentPassportNumber": "", "parentPassportExpiry": "", "dateOfDeparture": None, "entryType": "", "eoiVisaType": "", "passportType": "", "vfsReferenceNumber": "", "familyReunificationCerificateNumber": "", "PVRequestRefNumber": "", "PVStatus": "", "PVStatusDescription": "", "PVCanAllowRetry": True, "PVisVerified": False, "eefRegistrationNumber": "", "isAutoRefresh": True, "helloVerifyNumber": "", "OfflineCClink": "", "idenfystatuscheck": False, "vafStatus": None, "SpecialAssistance": "", "AdditionalRefNo": None, "juridictionCode": "", "canInitiateVAF": False, "canEditVAF": False, "canDeleteVAF": False, "canDownloadVAF": False, "Retryleft": "", # 这里的 IP 应该已经在 create_session 时获取到了 "ipAddress": self.real_ip } if enable_ref: applicant["referenceNumber"] = str(user_inputs.get("cover_letter", "")) else: applicant["referenceNumber"] = None if ocr_enabled: applicant["applicantImage"] = str(user_inputs.get("applicant_image", "")) applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", "")) applicant["GUID"] = str(user_inputs.get("guid", "")) payload = { "countryCode": self.free_config.get("country_code"), "missionCode": self.free_config.get("mission_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "applicantList": [applicant], "languageCode": "en-US", "isWaitlist": is_waitlist, "isEdit": False, "feeEntryTypeCode": None, "feeExemptionTypeCode": None, "feeExemptionDetailsCode": None, "juridictionCode": None, "regionCode": None } resp = self._perform_request("POST", url, headers=headers, json_data=payload) return resp.json().get("urn") def _applicant_otp_send(self, apt_config, urn) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "OTP": "", "otpAction": "GENERATE", "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isOTPGenerated", False) def _applicant_otp_verify(self, apt_config, urn, otp) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) # VFS 这里的 header 有时需要 datacenter,原代码有就加上 headers["datacenter"] = "GERMANY" data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "OTP": otp, "otpAction": "VALIDATE", "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isOTPValidated", False) def _query_slot_calendar(self, apt_config, urn, from_date) -> List: url = "https://lift-api.vfsglobal.com/appointment/calendar" headers = self._get_common_headers(with_auth=True) # 将 YYYY-MM-DD 转为 DD/MM/YYYY 用于 API dt_m = datetime.strptime(from_date, "%Y-%m-%d") converted_date = dt_m.strftime("%d/%m/%Y") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "fromDate": converted_date, "urn": urn, "payCode": "" } resp = self._perform_request("POST", url, headers=headers, json_data=data) calendars = resp.json().get("calendars") ads_out = [] if calendars: for item in calendars: # API 返回可能是 MM/DD/YYYY 或 DD/MM/YYYY,VFS 比较乱 # 通常是 MM/DD/YYYY raw = item.get("date") ads_out.append(to_yyyymmdd(raw, "%m/%d/%Y")) return ads_out def _query_slot_time(self, apt_config, urn, slot_date) -> List: url = "https://lift-api.vfsglobal.com/appointment/timeslot" headers = self._get_common_headers(with_auth=True) dt_m = datetime.strptime(slot_date, "%Y-%m-%d") converted_date = dt_m.strftime("%d/%m/%Y") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "slotDate": converted_date, "urn": urn } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("slots", []) def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool: url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit" headers = self._get_common_headers(with_auth=True) dt = datetime.strptime(earliest_date, "%Y-%m-%d") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "firstEarliestSlotDate": dt.strftime("%d/%m/%Y"), "action": "schedule", "ipAddress": self.real_ip, "eadAppointmentDetail": dt.strftime("%Y-%m-%dT%H:%M:%S") } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isSavedSuccess", False) def _submit_no_addition_service(self, urn): url = "https://lift-api.vfsglobal.com/vas/mapvas" headers = self._get_common_headers(with_auth=True) data = { "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "urn": urn, "applicants": [] } # 只要不报错即可 self._perform_request("POST", url, headers=headers, json_data=data) def _query_fee(self, apt_config, urn) -> Tuple[float, str]: url = "https://lift-api.vfsglobal.com/appointment/fees" headers = self._get_common_headers(with_auth=True) data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) j = resp.json() total = j.get("totalamount", 0.0) currency = "EUR" if j.get("feeDetails"): currency = j["feeDetails"][0].get("currency", "EUR") return total, currency def _schedule(self, apt_config, urn, amount, currency, slot_id) -> Dict: url = "https://lift-api.vfsglobal.com/appointment/schedule" headers = self._get_common_headers(with_auth=True) data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "paymentdetails": { "paymentmode": "Online", "RequestRefNo": "", "clientId": "", "merchantId": "", "amount": amount, "currency": currency }, "allocationId": str(slot_id), "CanVFSReachoutToApplicant": True } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json() def _pay_request(self, payload) -> str: """ 解析支付重定向 URL (DrissionPage 新标签页版) """ # 初始 URL,通常是一个 Redirect 接口 start_url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}" final_url = "" try: self._log("Resolving payment redirect...") # 使用新标签页去跑,以免当前会话状态丢失 pay_tab = self.page.new_tab(start_url) # 等待跳转完成 (通常会跳到 Stripe, WorldPay 或其他支付网关) # 等待直到 URL 不再是 PayRequest pay_tab.wait.url_change(start_url, timeout=15) final_url = pay_tab.url self._log(f"Payment URL resolved: {final_url}") # 关闭标签页 pay_tab.close() except Exception as e: self._log(f"[WARN] Failed to resolve payment URL: {e}") try: pay_tab.close() except: pass return final_url def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool: url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist" headers = self._get_common_headers(with_auth=True) data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "CanVFSReachoutToApplicant": True } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isConfirmed", False) def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: if not start_str or not end_str: return dates valid_dates = [] try: s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates except: return dates def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]: fmt = "%Y-%m-%d" try: dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now() dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1) try: dt_from = datetime.strptime(from_date, fmt) except: dt_from = datetime.now() except: return [] dt_start = dt_start.replace(day=1) dt_end = dt_end.replace(day=1) dt_from = dt_from.replace(day=1) curr = max(dt_start, dt_from) months = [] while curr <= dt_end: months.append(curr.strftime(fmt)) if curr.month == 12: curr = curr.replace(year=curr.year + 1, month=1) else: curr = curr.replace(month=curr.month + 1) return months