| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525 |
- # plugins/vfs_plugin2.py
- import os
- import time
- import json
- import random
- import base64
- import uuid
- import shutil
- import re
- import urllib.parse
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple, Callable
- # DrissionPage 核心引入
- from DrissionPage import ChromiumPage, ChromiumOptions
- from DrissionPage.common import Settings
- # 加密库
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, DateAvailability, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.proxy_tunnel import ProxyTunnel
- from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
- # ----------------- 静态常量与辅助数据 -----------------
- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc
- LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs
- t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y
- t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6
- 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0
- 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2
- GQIDAQAB
- -----END PUBLIC KEY-----"""
- COUNTRY_MAP = {
- "afghanistan": "AFG", "albania": "ALB", "algeria": "DZA", "andorra": "AND", "angola": "AGO",
- "antigua and barbuda": "ATG", "argentina": "ARG", "armenia": "ARM", "australia": "AUS", "austria": "AUT",
- "azerbaijan": "AZE", "bahamas": "BHS", "bahrain": "BHR", "bangladesh": "BGD", "barbados": "BRB", "belarus": "BLR",
- "belgium": "BEL", "belize": "BLZ", "benin": "BEN", "bhutan": "BTN", "bolivia": "BOL", "bosnia and herzegovina": "BIH",
- "botswana": "BWA", "brazil": "BRA", "brunei": "BRN", "bulgaria": "BGR", "burkina faso": "BFA", "burundi": "BDI",
- "cabo verde": "CPV", "cambodia": "KHM", "cameroon": "CMR", "canada": "CAN", "central african republic": "CAF",
- "chad": "TCD", "chile": "CHL", "china": "CHN", "colombia": "COL", "comoros": "COM", "congo (brazzaville)": "COG",
- "congo (kinshasa)": "COD", "costa rica": "CRI", "croatia": "HRV", "cuba": "CUB", "cyprus": "CYP", "czech republic": "CZE",
- "denmark": "DNK", "djibouti": "DJI", "dominica": "DMA", "dominican republic": "DOM", "ecuador": "ECU", "egypt": "EGY",
- "el salvador": "SLV", "equatorial guinea": "GNQ", "eritrea": "ERI", "estonia": "EST", "eswatini": "SWZ", "ethiopia": "ETH",
- "fiji": "FJI", "finland": "FIN", "france": "FRA", "gabon": "GAB", "gambia": "GMB", "georgia": "GEO", "germany": "DEU",
- "ghana": "GHA", "greece": "GRC", "grenada": "GRD", "guatemala": "GTM", "guinea": "GIN", "guinea-bissau": "GNB", "guyana": "GUY",
- "haiti": "HTI", "honduras": "HND", "hungary": "HUN", "iceland": "ISL", "india": "IND", "indonesia": "IDN", "iran": "IRN",
- "iraq": "IRQ", "ireland": "IRL", "israel": "ISR", "italy": "ITA", "jamaica": "JAM", "japan": "JPN", "jordan": "JOR",
- "kazakhstan": "KAZ", "kenya": "KEN", "kiribati": "KIR", "korea, north": "PRK", "korea, south": "KOR", "kuwait": "KWT",
- "kyrgyzstan": "KGZ", "laos": "LAO", "latvia": "LVA", "lebanon": "LBN", "lesotho": "LSO", "liberia": "LBR", "libya": "LBY",
- "liechtenstein": "LIE", "lithuania": "LTU", "luxembourg": "LUX", "madagascar": "MDG", "malawi": "MWI", "malaysia": "MYS",
- "maldives": "MDV", "mali": "MLI", "malta": "MLT", "marshall islands": "MHL", "mauritania": "MRT", "mauritius": "MUS",
- "mexico": "MEX", "micronesia": "FSM", "moldova": "MDA", "monaco": "MCO", "mongolia": "MNG", "montenegro": "MNE", "morocco": "MAR",
- "mozambique": "MOZ", "myanmar": "MMR", "namibia": "NAM", "nauru": "NRU", "nepal": "NPL", "netherlands": "NLD", "new zealand": "NZL",
- "nicaragua": "NIC", "niger": "NER", "nigeria": "NGA", "north macedonia": "MKD", "norway": "NOR", "oman": "OMN", "pakistan": "PAK",
- "palau": "PLW", "panama": "PAN", "papua new guinea": "PNG", "paraguay": "PRY", "peru": "PER", "philippines": "PHL", "poland": "POL",
- "portugal": "PRT", "qatar": "QAT", "romania": "ROU", "russia": "RUS", "rwanda": "RWA", "saudi arabia": "SAU", "senegal": "SEN",
- "serbia": "SRB", "seychelles": "SYC", "sierra leone": "SLE", "singapore": "SGP", "slovakia": "SVK", "slovenia": "SVN",
- "solomon islands": "SLB", "somalia": "SOM", "south africa": "ZAF", "spain": "ESP", "sri lanka": "LKA", "sudan": "SDN",
- "suriname": "SUR", "sweden": "SWE", "switzerland": "CHE", "syria": "SYR", "tajikistan": "TJK", "tanzania": "TZA", "thailand": "THA",
- "timor-leste": "TLS", "togo": "TGO", "tonga": "TON", "tunisia": "TUN", "turkey": "TUR", "turkmenistan": "TKM", "uganda": "UGA",
- "ukraine": "UKR", "united arab emirates": "ARE", "united kingdom": "GBR", "united states": "USA", "uruguay": "URY", "uzbekistan": "UZB",
- "vanuatu": "VUT", "venezuela": "VEN", "vietnam": "VNM", "yemen": "YEM", "zambia": "ZMB", "zimbabwe": "ZWE"
- }
- def get_country_iso3(name: str) -> str:
- return COUNTRY_MAP.get(name.lower(), "CHN")
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- try:
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime(target_format)
- except:
- return data_str
-
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- """
- 将邮箱域名替换为指定域名(默认 gmail-app.com)
- """
- if "@" not in email:
- raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- # --- 模拟 Requests Response 对象 ---
- class BrowserResponse:
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text:
- return {}
- try:
- self._json = json.loads(self.text)
- except:
- self._json = {}
- return self._json
- @property
- def content(self):
- return self.text.encode('utf-8')
- class VfsPlugin2(IVSPlg):
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- # 替换 requests.Session 为 DrissionPage
- self.page: Optional[ChromiumPage] = None
-
- self.jwt_token: str = ""
- self.real_ip: str = ""
- self.is_healthy: bool = True
-
- self.center_conf = None
- self.category_conf: Dict = {}
- self.subcategory_conf: Dict = {}
-
- self.booking_wait_applied = False
-
- self.public_key = serialization.load_pem_public_key(
- VFS_PUBLIC_KEY_PEM.encode(),
- backend=default_backend()
- )
-
- # --- [核心修改] 并发隔离与资源管理 ---
- # 生成唯一实例 ID
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- # 持有隧道实例
- self.tunnel = None
-
- # 确保根目录存在 (子目录由具体逻辑创建)
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[VfsPlugin] [{self.group_id}] {message}')
- else:
- print(f'[VfsPlugin] [{self.group_id}] {message}')
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.page is None:
- return False
- # 检查页面是否还活着
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
-
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life * 60:
- self._log(f"Session expired.")
- return False
- return True
- def create_session(self) -> None:
- """
- 使用 DrissionPage 创建会话:
- 1. 启动浏览器
- 2. 导航到登录页
- 3. 自动过盾并提取 Token (集成 CloudflareBypasser)
- 4. JS fetch 登录
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
-
- # 0. 配置浏览器
- co = ChromiumOptions()
- # -------------------------------------------------------------
- # [核心修复] 解决 'not enough values to unpack'
- # -------------------------------------------------------------
- # 1. 不要用 co.auto_port(),因为它依赖解析 stdout,会被 DBus 报错干扰
- # 2. 我们手动随机生成一个端口
- import random
- import socket
-
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
-
- debug_port = get_free_port()
- self._log(f"Assigned Debug Port: {debug_port}")
-
- # 3. 强制指定端口,DrissionPage 就会直接连接,不再解析日志
- co.set_local_port(debug_port)
-
- # --- [关键配置] 设置独立的用户数据目录 ---
- # 这样每个实例的 Cache, Cookies, LocalStorage 都是完全隔离的
- # 同时也防止了多进程争抢同一个 Default 文件夹导致的崩溃
- co.set_user_data_path(self.user_data_path)
-
- # --- 1. 指定浏览器路径 (适配 Docker) ---
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- # --- [核心修改] 代理配置 ---
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
-
- if p.username and p.password:
- self._log(f"Starting Proxy Tunnel for {p.ip}...")
-
- # 1. 启动本地隧道
- self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
- local_proxy = self.tunnel.start()
-
- self._log(f"Tunnel started at {local_proxy}")
-
- # 2. Chrome 连接本地免密端口
- # 必须使用 --proxy-server 强制指定,绝对稳健
- co.set_argument(f'--proxy-server={local_proxy}')
-
- else:
- # 无密码代理,直接用
- proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
-
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- # Docker 默认 /dev/shm 只有 64MB,Chromium 很容易爆内存崩溃
- co.set_argument('--disable-dev-shm-usage')
-
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
-
- # 1. 导航到登录页面
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- lang = self.free_config.get("language", "en")
-
- if not mission or not country:
- raise BizLogicError("Missing mission/country code config")
- login_page_url = f"https://visa.vfsglobal.com/{country}/{lang}/{mission}/login"
- self._log(f"Navigating to {login_page_url}...")
-
- self.page.get(login_page_url)
-
- # -------------------------------------------------------------
- # [核心修改] 2. 智能 Cloudflare 过盾逻辑
- # -------------------------------------------------------------
- self._log("Handling Cloudflare challenge...")
-
- # 初始化过盾助手
- cf_bypasser = CloudflareBypasser(self.page, log=self.config.debug)
- cf_token = ""
-
- # 循环检测 (40秒超时)
- for i in range(40):
- time.sleep(1)
-
- # A. 优先处理 Cookie 遮挡 (VFS 必须步骤)
- # 如果不关掉 cookie banner,验证码可能点不到
- self._handle_cookie_banner()
-
- # B. 尝试从 DOM 获取 Token (无感验证可能自动通过)
- try:
- ele = self.page.ele('@name=cf-turnstile-response')
- if ele and ele.value:
- cf_token = ele.value
- self._log("Cloudflare Turnstile token extracted.")
- break
- except:
- pass
-
- # C. 如果前 3 秒没自动出 Token,开始尝试点击
- if i > 2:
- try:
- # 开启 DFS 深度搜索模式 (防止 Shadow DOM 嵌套太深找不到)
- # 在第 10 秒后开启深度搜索,前期用快速搜索
- use_dfs = False
- cf_bypasser.click_verification_button(is_dfs=use_dfs)
- except Exception as e:
- # 点击错误忽略,继续下一轮
- pass
-
- # D. 检查是否已经看到了登录框 (有时候 Token 提取慢了,但页面已经变了)
- if self.page.ele('tag:form') or self.page.ele('#mat-input-0'):
- self._log("Login form detected.")
- # 继续尝试提取一次 Token,如果实在没有也不要死循环
- if i > 5 and not cf_token:
- self._log("Form visible but token not found yet...")
-
- # -------------------------------------------------------------
-
- if not cf_token:
- # 最后尝试一次强取
- try:
- cf_token = self.page.ele('@name=cf-turnstile-response').value
- except:
- pass
-
- if not cf_token:
- self._log("[WARN] Could not extract Turnstile token.")
- raise BizLogicError(f"Could not extract Turnstile token.")
- # 3. 准备登录 API 参数
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission,
- "countrycode": country,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token
- }
-
- self._log("Sending Login Request via Browser Fetch...")
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- resp_json = resp.json()
- # 分支 1: 登录成功
- if resp_json.get('accessToken'):
- self.jwt_token = resp_json["accessToken"]
- self._log("Login successful, JWT obtained.")
-
- # 分支 2: OTP
- elif resp_json.get("enableOTPAuthentication"):
- self._log("Login requires OTP.")
- # 注意:_submit_login_otp 内部也会调用 _refresh_turnstile_token
- # 所以这里旧的 cf_token 其实用处不大,传过去也没事
- otp = self._read_otp_email()
- self._submit_login_otp(cf_token, otp)
-
- else:
- raise BizLogicError(f"Login failed: {resp.text[:200]}")
- self.session_create_time = time.time()
- try:
- self.real_ip = self._get_realnetwork_ip()
- except:
- self.real_ip = "0.0.0.0"
-
- except Exception as e:
- self._log(f"Create Session Failed: {e}")
- self.cleanup()
- raise e
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- """查询可预约 Slot"""
- result = VSQueryResult()
- apt_config = self.free_config.get("apt_configs", {}).get(apt_type.routing_key)
- try:
- self._fetch_configurations(apt_config)
- query_result = self._query_earliest_slot(apt_config)
- result.success = False
- result.availability_status = AvailabilityStatus.NoneAvailable
- if query_result:
- result.success = True
- if "WaitList" in query_result:
- result.availability_status = AvailabilityStatus.Waitlist
- else:
- earliest_dt = datetime.strptime(query_result, "%Y-%m-%d")
- result.availability_status = AvailabilityStatus.Available
- result.earliest_date = earliest_dt
- result.availability = [DateAvailability(date=earliest_dt, times=[])]
- self._log(f"Slot Found! -> {query_result}")
- else:
- self._log("No slots available.")
-
- except Exception as e:
- self._log(f"Query Error: {e}")
- raise e
-
- return result
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- """
- 核心方法:在 DrissionPage 浏览器上下文中注入 JS 执行 fetch
- 并记录详细的 Traffic 日志用于分析
- """
- if not self.page:
- raise BizLogicError("Browser session not initialized")
- # ---------------------------------------------------------
- # 1. 预处理 URL (构造最终请求地址)
- # ---------------------------------------------------------
- req_url = url
- if params:
- # 确保引用了 urllib
- import urllib.parse
- sep = '&' if '?' in req_url else '?'
- req_url += sep + urllib.parse.urlencode(params)
- # ---------------------------------------------------------
- # 2. 构造 Body 和 Fetch 选项
- # ---------------------------------------------------------
- final_headers = headers or {}
-
- fetch_options = {
- "method": method.upper(),
- "headers": final_headers,
- "credentials": "include" # 关键:带上浏览器 Cookie
- }
-
- # 用于日志记录的 Body 内容(字符串形式)
- log_body = "None"
- if json_data:
- json_str = json.dumps(json_data)
- fetch_options['body'] = json_str
- fetch_options['headers']['Content-Type'] = 'application/json'
- log_body = json_str
- elif data:
- if isinstance(data, dict):
- import urllib.parse
- encoded_data = urllib.parse.urlencode(data)
- fetch_options['body'] = encoded_data
- fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
- log_body = encoded_data
- else:
- fetch_options['body'] = data
- log_body = str(data)
- # ---------------------------------------------------------
- # [日志] 记录请求数据
- # ---------------------------------------------------------
- self._log(f"┌── [TRAFFIC REQUEST] {method} {req_url}")
- self._log(f"├── Headers: {json.dumps(final_headers)}")
- self._log(f"└── Body: {log_body}")
- # ---------------------------------------------------------
- # 3. 注入 JS 执行 Fetch
- # ---------------------------------------------------------
- js_script = f"""
- const url = "{req_url}";
- const options = {json.dumps(fetch_options)};
-
- const startTime = Date.now();
-
- return fetch(url, options)
- .then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
- const endTime = Date.now();
-
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url,
- duration: endTime - startTime
- }};
- }})
- .catch(error => {{
- return {{
- status: 0,
- body: error.toString(),
- headers: {{}},
- url: url,
- duration: Date.now() - startTime
- }};
- }});
- """
-
- try:
- # run_js 直接返回 return 的对象
- # 适当增加超时时间,防止网络慢导致 Python 侧报错
- res_dict = self.page.run_js(js_script, timeout=60)
- except Exception as e:
- self._log(f"[TRAFFIC ERROR] JS Execution failed: {e}")
- raise BizLogicError(f"Browser JS Execution Error: {e}")
- resp = BrowserResponse(res_dict)
-
- # ---------------------------------------------------------
- # [日志] 记录响应数据
- # ---------------------------------------------------------
- duration = res_dict.get('duration', 0)
- # 截取过长的响应体,避免日志文件爆炸 (保留前 1000 字符)
- # 如果需要完整分析,可以去掉 [:1000]
- resp_preview = resp.text[:1000] + "..." if len(resp.text) > 1000 else resp.text
-
- self._log(f"┌── [TRAFFIC RESPONSE] Status: {resp.status_code} | Time: {duration}ms")
- self._log(f"└── Body: {resp_preview}")
- # ---------------------------------------------------------
- # 4. 统一处理状态码
- # ---------------------------------------------------------
- if resp.status_code == 200:
- return resp
-
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError(f"401 Unauthorized: {resp.text[:100]}")
-
- elif resp.status_code == 403:
- # 检查是否是 Cloudflare 拦截
- if "Just a moment" in resp.text or "cloudflare" in resp.text.lower():
- self._log(f"[TRAFFIC] HTTP 403 (Cloudflare) detected. Re-verifying (Try {retry_count+1}/3)...")
-
- if retry_count < 3:
- # 调用过盾逻辑
- new_token = self._refresh_turnstile_token()
-
- if new_token:
- self._log("[TRAFFIC] In-page verification success. Retrying...")
-
- # 如果原请求包含验证码字段,更新它
- if json_data and "captcha_api_key" in json_data:
- json_data["captcha_api_key"] = new_token
-
- # 递归重试
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
-
- # 如果不是 CF 或者重试耗尽
- raise PermissionDeniedError(f"HTTP 403 Forbidden: {resp.text[:100]}")
-
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError(f"429 Rate Limit: {resp.text[:100]}")
-
- elif resp.status_code == 0:
- raise BizLogicError(f"Network Error (Fetch Failed): {resp.text}")
-
- else:
- # 允许 400 业务错误通过,交给上层解析 (例如登录失败)
- if url.endswith("/login") and resp.status_code == 400:
- return resp
-
- # 其他错误视为业务逻辑异常
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _handle_cookie_banner(self):
- """
- 处理 OneTrust Cookie 遮挡
- 策略:尝试点击“接受所有”,如果点不到就直接移除 DOM
- """
- try:
- # 使用 JS 处理最快,且不会因为元素运动报错
- js = """
- try {
- // 1. 尝试点击 '接受所有' 按钮
- var acceptBtn = document.getElementById('onetrust-accept-btn-handler');
- if (acceptBtn) {
- acceptBtn.click();
- return true;
- }
-
- // 2. 如果没有按钮,或者还在遮挡,直接把整个 banner 删掉
- var banner = document.getElementById('onetrust-banner-sdk');
- if (banner) {
- banner.style.display = 'none'; // 隐藏
- banner.remove(); // 或者移除
- return true;
- }
- } catch(e) {}
- return false;
- """
- self.page.run_js(js)
- except:
- pass
- def _get_proxy_url(self):
- if self.config.proxy and self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- return f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- return f"{s.scheme}://{s.ip}:{s.port}"
- return None
-
- def _get_realnetwork_ip(self):
- """
- 通过新建标签页获取 IP
- 解决 CORS 403 问题:新标签页请求属于 Top-Level Navigation,
- 不带 Origin: visa.vfsglobal.com,也不带 credentials,符合 ipify 规则。
- """
- try:
- # 1. 新建一个标签页 (后台静默打开)
- tab = self.page.new_tab("https://api.ipify.org/?format=json")
-
- # 2. 获取页面内容 (DrissionPage 会自动等待页面加载)
- # ipify 返回的是纯 JSON 文本,通常在 body 或 pre 标签里
- if tab.ele('tag:pre'):
- json_text = tab.ele('tag:pre').text
- else:
- json_text = tab.ele('tag:body').text
-
- # 3. 提取 IP
- ip = json.loads(json_text)['ip']
-
- # 4. 务必关闭标签页,释放资源
- tab.close()
-
- self._log(f"Real Network IP: {ip}")
- return ip
-
- except Exception as e:
- self._log(f"[WARN] Failed to check IP via new tab: {e}")
- # 尝试清理可能没关掉的标签页
- try:
- if self.page.tabs_count > 1:
- tab.close()
- except:
- pass
- return "0.0.0.0"
- def _get_common_headers(self, with_auth=True) -> Dict[str, str]:
- # DrissionPage 浏览器会自动带上 Origin, Referer, User-Agent, Sec-CH-UA 等
- # 这里只需要补充业务特定的 Headers
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- lang = self.free_config.get("language", "en")
- route = f"{country}/{lang}/{mission}"
-
- h = {
- "accept": "application/json, text/plain, */*",
- # "origin": ... 浏览器自动处理
- # "referer": ... 浏览器自动处理
- "route": route
- }
-
- # 即使是浏览器环境,VFS 也需要这两个加密参数
- # 注意:这里可能需要从 JS 获取,或者保持 Python 生成
- # 如果 Python 生成的总是报错,可以考虑把加密逻辑移到 JS 里跑
- h["clientsource"] = self._get_client_source()
-
- if with_auth and self.jwt_token:
- h["authorize"] = self.jwt_token
-
- return h
- def _encrypt_password(self, password: str) -> str:
- ciphertext = self.public_key.encrypt(
- password.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None
- )
- )
- return base64.b64encode(ciphertext).decode()
- def _get_orange_source(self, email: str) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"{email};{timestamp}"
- return self._encrypt_password(payload)
- def _get_client_source(self) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"GA;{timestamp}Z"
- return self._encrypt_password(payload)
- def _query_earliest_slot(self, apt_config) -> Optional[str]:
- url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable"
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countrycode": self.free_config.get("country_code"),
- "vacCode": apt_config.get("vac_code"),
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "roleName": "Individual",
- "loginUser": self.config.account.username,
- "payCode": ""
- }
- headers = self._get_common_headers(with_auth=True)
- # fetch 不需要显式 content-type application/json,json_data会自动处理
-
- # DrissionPage 不需要手动处理 403 绕盾,因为浏览器本身就在盾后面
- resp = self._perform_request("POST", url, headers=headers, json_data=data, retry_count=2)
-
- if "WaitList" in resp.text:
- return "WaitList"
- j = resp.json()
- if j.get("earliestSlotLists"):
- raw_date = j["earliestSlotLists"][0]["date"]
- return to_yyyymmdd(raw_date, "%m/%d/%Y %H:%M:%S")
- return ""
- def _fetch_configurations(self, apt_config: Dict[str, Any]):
- if not self.center_conf:
- self.center_conf = self._query_center()
- vac_code = apt_config.get("vac_code")
- category_code = apt_config.get("category_code")
-
- if category_code not in self.category_conf:
- visa_categories = self._query_visa_category(vac_code)
- found = False
- for vc in visa_categories:
- if vc.get("code") == category_code:
- self.category_conf[category_code] = vc
- found = True
- break
- if not found:
- self._log(f"WARN: Category {category_code} not found")
- sub_category_code = apt_config.get("subcategory_code")
- if sub_category_code not in self.subcategory_conf:
- visa_subcategories = self._query_visa_sub_category(vac_code, category_code)
- found = False
- for svc in visa_subcategories:
- if svc.get("code") == sub_category_code:
- self.subcategory_conf[sub_category_code] = svc
- found = True
- break
- if not found:
- self._log(f"WARN: SubCategory {sub_category_code} not found")
- def _query_center(self) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_category(self, center_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_sub_category(self, center_code: str, category_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- enc_cat = urllib.parse.quote(category_code)
- url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
- def _read_otp_email(self) -> str:
- # 保持原样,这部分使用云API读取邮件,不依赖本地网络库
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- sender = "donotreply at vfshelpline.com"
- subject_keywords = "One Time Password"
- body_keywords = "OTP"
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- self._log(f"Waiting for OTP email...")
- for i in range(12):
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300
- )
- if content_out:
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- return match.group(0)
- time.sleep(5)
- raise NotFoundError(message="OTP email not found")
- def _submit_login_otp(self, old_cf_token: str, otp: str):
- self._log("Submitting Login OTP...")
-
- # --- [新增] 必须刷新 Token ---
- # 旧的 old_cf_token 已经在第一步登录时失效了
- new_cf_token = self._refresh_turnstile_token()
- # ---------------------------
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission,
- "countrycode": country,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": new_cf_token,
- "otp": otp
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- resp_json = resp.json()
-
- if resp_json.get("accessToken"):
- self.jwt_token = resp_json["accessToken"]
- self._log("OTP Login successful.")
- return
- # 增加错误详情日志
- error_desc = resp_json.get("description", resp.text)
- raise PermissionDeniedError(message=f"OTP Login Failed: {error_desc}")
-
- def _refresh_turnstile_token(self) -> str:
- """
- 强制刷新 Cloudflare Turnstile 并获取新 Token (集成 CloudflareBypasser 版)
- """
- self._log("Refreshing Cloudflare Turnstile token...")
-
- # 1. JS 强制重置 (保持不变)
- js_reset = """
- try {
- var input = document.querySelector('input[name="cf-turnstile-response"]');
- if (input) input.value = "";
- window.turnstile.reset();
- } catch(e) {
- console.log("Turnstile reset error:", e);
- }
- """
- self.page.run_js(js_reset)
-
- # 2. 初始化过盾助手
- # 假设 CloudflareBypasser 类已在当前文件中定义
- cf_bypasser = CloudflareBypasser(self.page, log=self.config.debug)
-
- # 3. 轮询等待 (30秒)
- for i in range(60):
- time.sleep(0.5)
-
- # A. 检查 Token 是否已生成
- # 使用 DrissionPage 的方式获取 value 比较稳定
- try:
- ele = self.page.ele('@name=cf-turnstile-response')
- if ele and ele.value:
- self._log("Turnstile token refreshed successfully.")
- return ele.value
- except:
- pass
-
- # B. 尝试点击验证框
- # 策略:前2秒等待,之后开始尝试点击
- if i > 4:
- # [重要] VFS 经常有 Cookie 弹窗遮挡,先尝试清理一下
- self._handle_cookie_banner()
-
- try:
- # 使用 CloudflareBypasser 的高级点击逻辑
- # is_dfs=True 表示如果普通搜索找不到,就递归搜索 iframe (更耗时但更强)
- # 我们在尝试 10 次 (5秒) 后开启 DFS 模式
- use_dfs = (i > 14)
-
- cf_bypasser.click_verification_button(is_dfs=use_dfs)
- except Exception as e:
- # 点击过程报错不要中断主循环
- pass
-
- raise BizLogicError("Failed to refresh Cloudflare Turnstile token (Timeout)")
- # -------------------------------------------------------------
- # 核心预约逻辑 (DrissionPage 版)
- # -------------------------------------------------------------
- def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult:
- """
- 执行完整的预约流程
- """
- self._log("Starting booking process...")
-
- # 1. 准备数据
- user_email = user_inputs.get('email')
- # 生成别名邮箱 (防止邮箱被 VFS 黑名单)
- user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com")
-
- res = VSBookResult()
- app_type = slot_info.apt_type
- # 如果没有 earliest_date,默认从今天开始
- from_date = slot_info.earliest_date.strftime("%Y-%m-%d") if slot_info.earliest_date else datetime.now().strftime("%Y-%m-%d")
-
- apt_config = self.free_config.get("apt_configs", {}).get(app_type.routing_key)
-
- if not apt_config:
- raise NotFoundError(message="Book: Config missing for this routing key.")
- # 确保配置已加载 (SubCategory 等)
- self._fetch_configurations(apt_config)
- sub_cc = apt_config.get("subcategory_code")
- sub_conf = self.subcategory_conf.get(sub_cc, {})
- # 3. OCR 识别 / 文档上传 (如果需要)
- # 上传结果存入 user_inputs 供后续使用
- ocr_enabled = sub_conf.get("isOCREnable", False)
- if ocr_enabled:
- self._log("OCR Enabled, uploading documents...")
- upload_res = self._upload_applicant_documents(apt_config, user_inputs)
- user_inputs["applicant_image"] = upload_res.get("passportImageFilename")
- user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes")
- user_inputs["guid"] = upload_res.get("uploadDocumentGUID")
- enable_reference_number = sub_conf.get("enableReferenceNumber", False)
- # 4. 添加申请人 (核心步骤 1)
- final_urn = None
- is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist)
-
- if not self.booking_wait_applied:
- self._log("pre-booking wait: sleeping 20s before booking to avoid risk control")
- time.sleep(20)
- self.booking_wait_applied = True
-
- # 重试机制:添加申请人有时候会因为并发冲突失败
- MAX_RETRY = 3
- for i in range(MAX_RETRY):
- try:
- final_urn = self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number)
- if final_urn:
- break
- except Exception as e:
- self._log(f"Add Applicant retry {i+1}/{MAX_RETRY}: {e}")
- time.sleep(2)
-
- if not final_urn:
- raise BizLogicError(message="Failed to add primary applicant (Slot likely taken or API error)")
- self._log(f"Applicant Added. URN: {final_urn}")
- # 5. 申请人 OTP 验证 (核心步骤 2 - 视配置而定)
- otp_enabled = sub_conf.get("isApplicantOTPEnabled", False)
- if otp_enabled:
- self._log("Applicant OTP Required.")
- if not self._applicant_otp_send(apt_config, final_urn):
- raise BizLogicError(message='Applicant OTP send failed')
-
- # 复用之前的读邮件逻辑
- otp_code = self._read_otp_email()
- if not self._applicant_otp_verify(apt_config, final_urn, otp_code):
- raise BizLogicError(message='Applicant OTP verify failed')
- # 6. Waitlist 模式直接返回
- if is_waitlist:
- if self._confirm_waitlist(apt_config, final_urn):
- res.success = True
- res.urn = final_urn
- res.account = self.config.account.username
- self._log("Waitlist confirmed.")
- return res
- raise BizLogicError(message='Confirm waitlist failed')
- # 7. 寻找具体的时间槽 (核心步骤 3)
- expected_start = user_inputs.get("expected_start_date", "")
- expected_end = user_inputs.get("expected_end_date", "")
-
- # 计算需要扫描的月份
- months = self._get_filtered_covered_months(expected_start, expected_end, from_date)
- self._log(f"Scanning months: {months} (Start looking from: {from_date})")
-
- selected_slot_id = ""
- selected_slot_date = ""
- selected_slot_time_range = ""
-
- all_ads = set()
- forbidden_dates = set()
- found_slot = False
-
- for m_str in months:
- self._log(f"Checking calendar for {m_str}...")
- # 查询日历
- ads = self._query_slot_calendar(apt_config, final_urn, m_str)
-
- # 去重
- new_ads = [d for d in ads if d not in all_ads]
- all_ads.update(new_ads)
-
- # 尝试选中一个日期
- # 这里做一个简单循环,如果选中日期没时间了,就换一个日期
- for _ in range(3):
- avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates]
- # 根据用户期望过滤
- sel_dates = self._filter_dates(avail_candidates, expected_start, expected_end)
-
- if not sel_dates:
- break # 当前月没有符合要求的日期,去下一个月
-
- tmp_date = sel_dates[0] # 取第一个(通常 _filter_dates 里已经 shuffle 过了)
- forbidden_dates.add(tmp_date) # 标记为已尝试
-
- # 关键:Audit Log (锁定日期)
- # VFS 要求在查 timeslot 之前必须先发这个请求
- if not self._saveuseractionaudit(apt_config, final_urn, tmp_date):
- self._log(f"Audit failed for {tmp_date}, skipping...")
- time.sleep(1)
- continue
-
- # 查询具体时间
- ats = self._query_slot_time(apt_config, final_urn, tmp_date)
- if not ats:
- self._log(f"No timeslots for {tmp_date}")
- continue
-
- # 随机选一个时间
- sel_tm = random.choice(ats)
-
- selected_slot_id = sel_tm.get("allocationId")
- selected_slot_date = tmp_date
- selected_slot_time_range = sel_tm.get("slot")
-
- found_slot = True
- break
-
- if found_slot:
- break
-
- if not found_slot:
- self._log("No valid slots found after scanning.")
- res.success = False
- return res
- self._log(f"Slot Selected: {selected_slot_date} {selected_slot_time_range} (ID: {selected_slot_id})")
- # 8. 服务与费用 (核心步骤 4)
- self._submit_no_addition_service(final_urn)
- amount, currency = self._query_fee(apt_config, final_urn)
-
- # 9. 最终提交
- self._log("Submitting schedule...")
- schedule_res = self._schedule(apt_config, final_urn, amount, currency, selected_slot_id)
-
- if not schedule_res.get("IsAppointmentBooked"):
- self._log(f"Booking failed: {schedule_res}")
- res.success = False
- return res
-
- # 10. 构造成功结果
- res.success = True
- res.account = self.config.account.username
- res.book_date = selected_slot_date
- res.book_time = selected_slot_time_range
- res.urn = final_urn
- res.fee_amount = int(amount * 100)
- res.fee_currency = currency
-
- # 11. 处理支付链接
- if schedule_res.get("IsPaymentRequired", False):
- payload = schedule_res.get("payLoad", "")
- if payload:
- self._log("Processing payment link...")
- payment_url = self._pay_request(payload)
- if payment_url:
- res.payment_link = payment_url
-
- return res
- # -------------------------------------------------------------
- # 辅助方法实现 (DrissionPage 适配版)
- # -------------------------------------------------------------
- def _upload_applicant_documents(self, apt_config, user_inputs) -> Dict:
- """上传图片:先下载外部图片,再通过浏览器上传到 VFS"""
- import requests as standard_requests # 使用标准库下载外部资源
-
- url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument"
- passport_url = user_inputs.get("passport_image_url")
- if not passport_url:
- raise NotFoundError(message="Missing passport_image_url")
- # 下载图片 (不走代理或走系统代理,不使用 DrissionPage,因为是外部链接)
- try:
- img_resp = standard_requests.get(passport_url, timeout=30)
- if img_resp.status_code != 200:
- raise BizLogicError(message=f"Failed to download passport image: {img_resp.status_code}")
- b64_str = base64.b64encode(img_resp.content).decode('utf-8')
- except Exception as e:
- raise BizLogicError(message=f"Image download error: {e}")
-
- headers = self._get_common_headers(with_auth=True)
- # DrissionPage fetch 不需要显式 content-type application/json,json_data会自动处理
-
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "languageCode": "en-US",
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fileBytes": b64_str,
- "selfiImageFileBytes": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- result = resp.json()
-
- # 补充返回数据供后续使用
- result["passportImageFilename"] = "passport_img.jpg"
- result["passportImageFileBytes"] = b64_str
- return result
-
- def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any],
- is_waitlist: bool, ocr_enabled: bool, enable_ref: bool) -> str:
- """构造申请人 payload 并提交"""
- url = "https://lift-api.vfsglobal.com/appointment/applicants"
- headers = self._get_common_headers(with_auth=True)
- gender_str = str(user_inputs.get("gender", "")).lower()
- gender_code = 1 if gender_str == "male" else 2
- raw_dial = user_inputs.get("phone_country_code", "86")
- dial_code = str(raw_dial)
- # 日期格式转换 YYYY-MM-DD -> DD/MM/YYYY
- def _to_ddmmyyyy(d_str):
- try:
- return datetime.strptime(str(d_str), "%Y-%m-%d").strftime("%d/%m/%Y")
- except:
- return str(d_str)
- dob = _to_ddmmyyyy(user_inputs.get("birthday", ""))
- ppt_exp = _to_ddmmyyyy(user_inputs.get("passport_expiry_date", ""))
- applicant = {
- "urn": "",
- "arn": "",
- "loginUser": self.config.account.username,
- "firstName": str(user_inputs.get("first_name", "")).upper(),
- "middleName": "",
- "lastName": str(user_inputs.get("last_name", "")).upper(),
- "employerFirstName": "",
- "employerLastName": "",
- "salutation": "",
- "gender": gender_code,
- "contactNumber": str(user_inputs.get("phone", "")),
- "dialCode": dial_code,
- "employerContactNumber": "",
- "employerDialCode": "",
- "emailId": str(user_inputs.get("alias_email", "")).upper(),
- "employerEmailId": "",
- "passportNumber": str(user_inputs.get("passport_no", "")).upper(),
- "confirmPassportNumber": "",
- "passportExpirtyDate": ppt_exp,
- "dateOfBirth": dob,
- "nationalId": None,
- "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))),
- "state": None, "city": None, "addressline1": None, "addressline2": None, "pincode": None,
- "isEndorsedChild": False, "applicantType": 0, "vlnNumber": None, "applicantGroupId": 0,
- "parentPassportNumber": "", "parentPassportExpiry": "", "dateOfDeparture": None,
- "entryType": "", "eoiVisaType": "", "passportType": "", "vfsReferenceNumber": "",
- "familyReunificationCerificateNumber": "", "PVRequestRefNumber": "", "PVStatus": "",
- "PVStatusDescription": "", "PVCanAllowRetry": True, "PVisVerified": False,
- "eefRegistrationNumber": "", "isAutoRefresh": True, "helloVerifyNumber": "",
- "OfflineCClink": "", "idenfystatuscheck": False, "vafStatus": None,
- "SpecialAssistance": "", "AdditionalRefNo": None, "juridictionCode": "",
- "canInitiateVAF": False, "canEditVAF": False, "canDeleteVAF": False,
- "canDownloadVAF": False, "Retryleft": "",
- # 这里的 IP 应该已经在 create_session 时获取到了
- "ipAddress": self.real_ip
- }
- if enable_ref:
- applicant["referenceNumber"] = str(user_inputs.get("cover_letter", ""))
- else:
- applicant["referenceNumber"] = None
- if ocr_enabled:
- applicant["applicantImage"] = str(user_inputs.get("applicant_image", ""))
- applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", ""))
- applicant["GUID"] = str(user_inputs.get("guid", ""))
- payload = {
- "countryCode": self.free_config.get("country_code"),
- "missionCode": self.free_config.get("mission_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "applicantList": [applicant],
- "languageCode": "en-US",
- "isWaitlist": is_waitlist,
- "isEdit": False,
- "feeEntryTypeCode": None, "feeExemptionTypeCode": None,
- "feeExemptionDetailsCode": None, "juridictionCode": None, "regionCode": None
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=payload)
- return resp.json().get("urn")
-
- def _applicant_otp_send(self, apt_config, urn) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": "",
- "otpAction": "GENERATE",
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPGenerated", False)
- def _applicant_otp_verify(self, apt_config, urn, otp) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- # VFS 这里的 header 有时需要 datacenter,原代码有就加上
- headers["datacenter"] = "GERMANY"
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": otp,
- "otpAction": "VALIDATE",
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPValidated", False)
-
- def _query_slot_calendar(self, apt_config, urn, from_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/calendar"
- headers = self._get_common_headers(with_auth=True)
-
- # 将 YYYY-MM-DD 转为 DD/MM/YYYY 用于 API
- dt_m = datetime.strptime(from_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fromDate": converted_date,
- "urn": urn,
- "payCode": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- calendars = resp.json().get("calendars")
- ads_out = []
- if calendars:
- for item in calendars:
- # API 返回可能是 MM/DD/YYYY 或 DD/MM/YYYY,VFS 比较乱
- # 通常是 MM/DD/YYYY
- raw = item.get("date")
- ads_out.append(to_yyyymmdd(raw, "%m/%d/%Y"))
- return ads_out
-
- def _query_slot_time(self, apt_config, urn, slot_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/timeslot"
- headers = self._get_common_headers(with_auth=True)
-
- dt_m = datetime.strptime(slot_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "slotDate": converted_date,
- "urn": urn
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("slots", [])
- def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit"
- headers = self._get_common_headers(with_auth=True)
-
- dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "firstEarliestSlotDate": dt.strftime("%d/%m/%Y"),
- "action": "schedule",
- "ipAddress": self.real_ip,
- "eadAppointmentDetail": dt.strftime("%Y-%m-%dT%H:%M:%S")
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isSavedSuccess", False)
-
- def _submit_no_addition_service(self, urn):
- url = "https://lift-api.vfsglobal.com/vas/mapvas"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "urn": urn,
- "applicants": []
- }
- # 只要不报错即可
- self._perform_request("POST", url, headers=headers, json_data=data)
- def _query_fee(self, apt_config, urn) -> Tuple[float, str]:
- url = "https://lift-api.vfsglobal.com/appointment/fees"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- j = resp.json()
- total = j.get("totalamount", 0.0)
- currency = "EUR"
- if j.get("feeDetails"):
- currency = j["feeDetails"][0].get("currency", "EUR")
- return total, currency
- def _schedule(self, apt_config, urn, amount, currency, slot_id) -> Dict:
- url = "https://lift-api.vfsglobal.com/appointment/schedule"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "paymentdetails": {
- "paymentmode": "Online",
- "RequestRefNo": "",
- "clientId": "",
- "merchantId": "",
- "amount": amount,
- "currency": currency
- },
- "allocationId": str(slot_id),
- "CanVFSReachoutToApplicant": True
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json()
- def _pay_request(self, payload) -> str:
- """
- 解析支付重定向 URL (DrissionPage 新标签页版)
- """
- # 初始 URL,通常是一个 Redirect 接口
- start_url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}"
- final_url = ""
-
- try:
- self._log("Resolving payment redirect...")
- # 使用新标签页去跑,以免当前会话状态丢失
- pay_tab = self.page.new_tab(start_url)
-
- # 等待跳转完成 (通常会跳到 Stripe, WorldPay 或其他支付网关)
- # 等待直到 URL 不再是 PayRequest
- pay_tab.wait.url_change(start_url, timeout=15)
-
- final_url = pay_tab.url
- self._log(f"Payment URL resolved: {final_url}")
-
- # 关闭标签页
- pay_tab.close()
-
- except Exception as e:
- self._log(f"[WARN] Failed to resolve payment URL: {e}")
- try:
- pay_tab.close()
- except:
- pass
-
- return final_url
- def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "CanVFSReachoutToApplicant": True
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isConfirmed", False)
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- if not start_str or not end_str:
- return dates
- valid_dates = []
- try:
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
- except:
- return dates
- def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]:
- fmt = "%Y-%m-%d"
- try:
- dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now()
- dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1)
- try:
- dt_from = datetime.strptime(from_date, fmt)
- except:
- dt_from = datetime.now()
- except:
- return []
- dt_start = dt_start.replace(day=1)
- dt_end = dt_end.replace(day=1)
- dt_from = dt_from.replace(day=1)
- curr = max(dt_start, dt_from)
-
- months = []
- while curr <= dt_end:
- months.append(curr.strftime(fmt))
- if curr.month == 12:
- curr = curr.replace(year=curr.year + 1, month=1)
- else:
- curr = curr.replace(month=curr.month + 1)
- return months
-
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- # 1. 关闭浏览器
- if self.page:
- try:
- self.page.quit() # 这会关闭 Chrome 进程
- except Exception:
- pass # 忽略已关闭的错误
- self.page = None
-
- # 2. 删除文件
- # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- # 如果删除失败(通常是Windows文件占用),重试
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
-
- # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
-
- # 3. [新增] 关闭代理隧道
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
|