| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445 |
- # plugins/vfs_plugin2.py
- import os
- import time
- import json
- import random
- import base64
- import uuid
- import shutil
- import re
- import socket
- import urllib.parse
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple, Callable
- from DrissionPage import ChromiumPage, ChromiumOptions
- from DrissionPage.common import Settings
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- import configure
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, DateAvailability, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.mihomo_tunnel import MihomoTunnel
- from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc
- LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs
- t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y
- t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6
- 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0
- 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2
- GQIDAQAB
- -----END PUBLIC KEY-----"""
- COUNTRY_MAP = {
- "afghanistan": "AFG", "albania": "ALB", "algeria": "DZA", "andorra": "AND", "angola": "AGO",
- "antigua and barbuda": "ATG", "argentina": "ARG", "armenia": "ARM", "australia": "AUS", "austria": "AUT",
- "azerbaijan": "AZE", "bahamas": "BHS", "bahrain": "BHR", "bangladesh": "BGD", "barbados": "BRB", "belarus": "BLR",
- "belgium": "BEL", "belize": "BLZ", "benin": "BEN", "bhutan": "BTN", "bolivia": "BOL", "bosnia and herzegovina": "BIH",
- "botswana": "BWA", "brazil": "BRA", "brunei": "BRN", "bulgaria": "BGR", "burkina faso": "BFA", "burundi": "BDI",
- "cabo verde": "CPV", "cambodia": "KHM", "cameroon": "CMR", "canada": "CAN", "central african republic": "CAF",
- "chad": "TCD", "chile": "CHL", "china": "CHN", "colombia": "COL", "comoros": "COM", "congo (brazzaville)": "COG",
- "congo (kinshasa)": "COD", "costa rica": "CRI", "croatia": "HRV", "cuba": "CUB", "cyprus": "CYP", "czech republic": "CZE",
- "denmark": "DNK", "djibouti": "DJI", "dominica": "DMA", "dominican republic": "DOM", "ecuador": "ECU", "egypt": "EGY",
- "el salvador": "SLV", "equatorial guinea": "GNQ", "eritrea": "ERI", "estonia": "EST", "eswatini": "SWZ", "ethiopia": "ETH",
- "fiji": "FJI", "finland": "FIN", "france": "FRA", "gabon": "GAB", "gambia": "GMB", "georgia": "GEO", "germany": "DEU",
- "ghana": "GHA", "greece": "GRC", "grenada": "GRD", "guatemala": "GTM", "guinea": "GIN", "guinea-bissau": "GNB", "guyana": "GUY",
- "haiti": "HTI", "honduras": "HND", "hungary": "HUN", "iceland": "ISL", "india": "IND", "indonesia": "IDN", "iran": "IRN",
- "iraq": "IRQ", "ireland": "IRL", "israel": "ISR", "italy": "ITA", "jamaica": "JAM", "japan": "JPN", "jordan": "JOR",
- "kazakhstan": "KAZ", "kenya": "KEN", "kiribati": "KIR", "korea, north": "PRK", "korea, south": "KOR", "kuwait": "KWT",
- "kyrgyzstan": "KGZ", "laos": "LAO", "latvia": "LVA", "lebanon": "LBN", "lesotho": "LSO", "liberia": "LBR", "libya": "LBY",
- "liechtenstein": "LIE", "lithuania": "LTU", "luxembourg": "LUX", "madagascar": "MDG", "malawi": "MWI", "malaysia": "MYS",
- "maldives": "MDV", "mali": "MLI", "malta": "MLT", "marshall islands": "MHL", "mauritania": "MRT", "mauritius": "MUS",
- "mexico": "MEX", "micronesia": "FSM", "moldova": "MDA", "monaco": "MCO", "mongolia": "MNG", "montenegro": "MNE", "morocco": "MAR",
- "mozambique": "MOZ", "myanmar": "MMR", "namibia": "NAM", "nauru": "NRU", "nepal": "NPL", "netherlands": "NLD", "new zealand": "NZL",
- "nicaragua": "NIC", "niger": "NER", "nigeria": "NGA", "north macedonia": "MKD", "norway": "NOR", "oman": "OMN", "pakistan": "PAK",
- "palau": "PLW", "panama": "PAN", "papua new guinea": "PNG", "paraguay": "PRY", "peru": "PER", "philippines": "PHL", "poland": "POL",
- "portugal": "PRT", "qatar": "QAT", "romania": "ROU", "russia": "RUS", "rwanda": "RWA", "saudi arabia": "SAU", "senegal": "SEN",
- "serbia": "SRB", "seychelles": "SYC", "sierra leone": "SLE", "singapore": "SGP", "slovakia": "SVK", "slovenia": "SVN",
- "solomon islands": "SLB", "somalia": "SOM", "south africa": "ZAF", "spain": "ESP", "sri lanka": "LKA", "sudan": "SDN",
- "suriname": "SUR", "sweden": "SWE", "switzerland": "CHE", "syria": "SYR", "tajikistan": "TJK", "tanzania": "TZA", "thailand": "THA",
- "timor-leste": "TLS", "togo": "TGO", "tonga": "TON", "tunisia": "TUN", "turkey": "TUR", "turkmenistan": "TKM", "uganda": "UGA",
- "ukraine": "UKR", "united arab emirates": "ARE", "united kingdom": "GBR", "united states": "USA", "uruguay": "URY", "uzbekistan": "UZB",
- "vanuatu": "VUT", "venezuela": "VEN", "vietnam": "VNM", "yemen": "YEM", "zambia": "ZMB", "zimbabwe": "ZWE"
- }
- def get_country_iso3(name: str) -> str:
- return COUNTRY_MAP.get(name.lower(), "CHN")
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- try:
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime(target_format)
- except:
- return data_str
-
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- """
- 将邮箱域名替换为指定域名(默认 gmail-app.com)
- """
- if "@" not in email:
- raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- # --- 模拟 Requests Response 对象 ---
- class BrowserResponse:
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text:
- return {}
- try:
- self._json = json.loads(self.text)
- except:
- self._json = {}
- return self._json
- @property
- def content(self):
- return self.text.encode('utf-8')
- class VfsPlugin(IVSPlg):
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- self.page: Optional[ChromiumPage] = None
-
- self.jwt_token: str = ""
- self.real_ip: str = ""
- self.is_healthy: bool = True
-
- self.center_conf = None
- self.category_conf: Dict = {}
- self.subcategory_conf: Dict = {}
-
- self.booking_wait_applied = False
-
- self.public_key = serialization.load_pem_public_key(
- VFS_PUBLIC_KEY_PEM.encode(),
- backend=default_backend()
- )
-
- # --- [核心修改] 并发隔离与资源管理 ---
- # 生成唯一实例 ID
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}"))
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- # 持有隧道实例
- self.tunnel = None
-
- # 确保根目录存在 (子目录由具体逻辑创建)
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[VfsPlugin] [{self.group_id}] {message}')
- else:
- print(f'[VfsPlugin] [{self.group_id}] {message}')
-
- def keep_alive(self):
- try:
- applicants = self._get_application()
- self._log(f'keep_alive request, get applicant resp={applicants}')
- except:
- self.is_healthy = False
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.page is None:
- return False
- # 检查页面是否还活着
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
-
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life:
- self._log(f"Session expired.")
- return False
- return True
- def create_session(self) -> None:
- """
- 使用 DrissionPage 创建会话:
- 1. 启动浏览器
- 2. 导航到登录页
- 3. 自动过盾并提取 Token (集成 CloudflareBypasser)
- 4. JS fetch 登录
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- co = ChromiumOptions()
-
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
-
- debug_port = get_free_port()
- self._log(f"Assigned Debug Port: {debug_port}")
-
- co.set_local_port(debug_port)
- co.set_user_data_path(self.user_data_path)
-
- chrome_path = configure.CHROME_PATH
- if not chrome_path:
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
- self._log(f'Current proxy id={p.id}')
- if p.username and p.password:
- self._log(f"Starting Proxy Tunnel for {p.ip}...")
- exit_node = {
- "name": "ExitNode",
- "type": p.proto,
- "server": p.ip,
- "port": p.port,
- "username": p.username,
- "password": p.password
- }
- relay_node = None
- if configure.MIHOMO_RELAY_NODES:
- relay_node = random.choice(configure.MIHOMO_RELAY_NODES)
- mihomo_path = configure.MIHOMO_BIN_PATH
- if not mihomo_path:
- mihomo_path = os.getenv("MIHOMO_BIN")
- if not mihomo_path:
- raise BizLogicError(message='Mihomo path is null, You need set mihomo bin path in configure or os env')
- self.tunnel = MihomoTunnel(mihomo_path, exit_node=exit_node, relay_node=relay_node)
- local_proxy = self.tunnel.start()
- self._log(f"Tunnel started at {local_proxy}")
- co.set_argument(f'--proxy-server={local_proxy}')
- else:
- proxy_str = f"{p.proto}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
-
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- lang = self.free_config.get("language", "en")
-
- if not mission or not country:
- raise BizLogicError("Missing mission/country code config")
- login_page_url = f"https://visa.vfsglobal.com/{country}/{lang}/{mission}/login"
- self._log(f"Navigating to {login_page_url}...")
-
- self.page.get(login_page_url)
-
- self._log("Handling Cloudflare challenge...")
-
- cf_bypasser = CloudflareBypasser(self.page, log=self.config.debug)
- cf_token = ""
-
- for i in range(40):
- time.sleep(1)
- self._handle_cookie_banner()
- try:
- ele = self.page.ele('@name=cf-turnstile-response', timeout=1)
- if ele and ele.value:
- cf_token = ele.value
- self._log("Cloudflare Turnstile token extracted.")
- break
- except:
- pass
-
- if i > 2:
- try:
- use_dfs = False
- cf_bypasser.click_verification_button(use_dfs)
- except Exception as e:
- pass
-
- if self.page.ele('tag:form', timeout=0.5) or self.page.ele('#mat-input-0', timeout=0.5):
- self._log("Login form detected.")
- if i > 5 and not cf_token:
- self._log("Form visible but token not found yet...")
-
- # -------------------------------------------------------------
-
- if not cf_token:
- try:
- cf_token = self.page.ele('@name=cf-turnstile-response').value
- except:
- pass
-
- if not cf_token:
- self._log("[WARN] Could not extract Turnstile token.")
- raise BizLogicError(f"Could not extract Turnstile token.")
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission,
- "countrycode": country,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token
- }
-
- now_utc = datetime.utcnow()
- sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- self._log("Sending Login Request via Browser Fetch...")
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- resp_json = resp.json()
- if resp_json.get('accessToken'):
- self.jwt_token = resp_json["accessToken"]
- self._log("Login successful, JWT obtained.")
-
- elif resp_json.get("enableOTPAuthentication"):
- self._log("Login requires OTP.")
- otp = self._read_otp_email(sent_at=sent_at)
- self._submit_login_otp(cf_token, otp)
-
- else:
- raise BizLogicError(f"Login failed: {resp.text[:200]}")
- self.session_create_time = time.time()
- try:
- self.real_ip = self._get_realnetwork_ip()
- except:
- self.real_ip = "0.0.0.0"
-
- except Exception as e:
- self._log(f"Create Session Failed: {e}")
- time.sleep(3600)
- self.cleanup()
- raise e
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- """查询可预约 Slot"""
- result = VSQueryResult()
- apt_config = self.free_config.get("apt_configs", {}).get(apt_type.routing_key)
- try:
- self._fetch_configurations(apt_config)
- query_result = self._query_earliest_slot(apt_config)
- result.success = False
- result.availability_status = AvailabilityStatus.NoneAvailable
- if query_result:
- result.success = True
- if "WaitList" in query_result:
- result.availability_status = AvailabilityStatus.Waitlist
- else:
- earliest_dt = datetime.strptime(query_result, "%Y-%m-%d")
- result.availability_status = AvailabilityStatus.Available
- result.earliest_date = earliest_dt
- result.availability = [DateAvailability(date=earliest_dt, times=[])]
- self._log(f"Slot Found! -> {query_result}")
- else:
- self._log("No slots available.")
-
- except Exception as e:
- self._log(f"Query Error: {e}")
- raise e
-
- return result
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- """
- 核心方法:在 DrissionPage 浏览器上下文中注入 JS 执行 fetch
- 并记录详细的 Traffic 日志用于分析
- """
- if not self.page:
- raise BizLogicError("Browser session not initialized")
- req_url = url
- if params:
- sep = '&' if '?' in req_url else '?'
- req_url += sep + urllib.parse.urlencode(params)
- final_headers = headers or {}
-
- fetch_options = {
- "method": method.upper(),
- "headers": final_headers,
- "credentials": "include" # 关键:带上浏览器 Cookie
- }
-
- log_body = "None"
- if json_data:
- json_str = json.dumps(json_data)
- fetch_options['body'] = json_str
- fetch_options['headers']['Content-Type'] = 'application/json'
- log_body = json_str
- elif data:
- if isinstance(data, dict):
- encoded_data = urllib.parse.urlencode(data)
- fetch_options['body'] = encoded_data
- fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
- log_body = encoded_data
- else:
- fetch_options['body'] = data
- log_body = str(data)
- self._log(f"┌── [TRAFFIC REQUEST] {method} {req_url}")
- self._log(f"├── Headers: {json.dumps(final_headers)}")
- self._log(f"└── Body: {log_body}")
- js_script = f"""
- const url = "{req_url}";
- const options = {json.dumps(fetch_options)};
-
- const startTime = Date.now();
-
- return fetch(url, options)
- .then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
- const endTime = Date.now();
-
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url,
- duration: endTime - startTime
- }};
- }})
- .catch(error => {{
- return {{
- status: 0,
- body: error.toString(),
- headers: {{}},
- url: url,
- duration: Date.now() - startTime
- }};
- }});
- """
-
- try:
- res_dict = self.page.run_js(js_script, timeout=60)
- except Exception as e:
- self._log(f"[TRAFFIC ERROR] JS Execution failed: {e}")
- raise BizLogicError(f"Browser JS Execution Error: {e}")
- resp = BrowserResponse(res_dict)
-
- duration = res_dict.get('duration', 0)
- resp_preview = resp.text[:1000] + "..." if len(resp.text) > 1000 else resp.text
-
- self._log(f"┌── [TRAFFIC RESPONSE] Status: {resp.status_code} | Time: {duration}ms")
- self._log(f"└── Body: {resp_preview}")
- if resp.status_code == 200:
- return resp
-
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError(f"401 Unauthorized: {resp.text[:100]}")
-
- elif resp.status_code == 403:
- if "Just a moment" in resp.text or "cloudflare" in resp.text.lower():
- self._log(f"[TRAFFIC] HTTP 403 (Cloudflare) detected. Re-verifying (Try {retry_count+1}/3)...")
-
- if retry_count < 3:
- new_token = self._refresh_turnstile_token()
-
- if new_token:
- self._log("[TRAFFIC] In-page verification success. Retrying...")
-
- if json_data and "captcha_api_key" in json_data:
- json_data["captcha_api_key"] = new_token
-
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
-
- raise PermissionDeniedError(f"HTTP 403 Forbidden: {resp.text[:100]}")
-
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError(f"429 Rate Limit: {resp.text[:100]}")
-
- elif resp.status_code == 0:
- raise BizLogicError(f"Network Error (Fetch Failed): {resp.text}")
-
- else:
- if url.endswith("/login") and resp.status_code == 400:
- return resp
-
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _handle_cookie_banner(self):
- """
- 处理 OneTrust Cookie 遮挡
- 策略:尝试点击“接受所有”,如果点不到就直接移除 DOM
- """
- try:
- # 使用 JS 处理最快,且不会因为元素运动报错
- js = """
- try {
- // 1. 尝试点击 '接受所有' 按钮
- var acceptBtn = document.getElementById('onetrust-accept-btn-handler');
- if (acceptBtn) {
- acceptBtn.click();
- return true;
- }
-
- // 2. 如果没有按钮,或者还在遮挡,直接把整个 banner 删掉
- var banner = document.getElementById('onetrust-banner-sdk');
- if (banner) {
- banner.style.display = 'none'; // 隐藏
- banner.remove(); // 或者移除
- return true;
- }
- } catch(e) {}
- return false;
- """
- self.page.run_js(js)
- except:
- pass
- def _get_proxy_url(self):
- if self.config.proxy and self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- return f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- return f"{s.proto}://{s.ip}:{s.port}"
- return None
-
- def _get_realnetwork_ip(self):
- """
- 通过新建标签页获取 IP
- 解决 CORS 403 问题:新标签页请求属于 Top-Level Navigation,
- 不带 Origin: visa.vfsglobal.com,也不带 credentials,符合 ipify 规则。
- """
- try:
- tab = self.page.new_tab("https://api.ipify.org/?format=json")
-
- if tab.ele('tag:pre'):
- json_text = tab.ele('tag:pre').text
- else:
- json_text = tab.ele('tag:body').text
-
- ip = json.loads(json_text)['ip']
-
- tab.close()
-
- self._log(f"Real Network IP: {ip}")
- return ip
-
- except Exception as e:
- self._log(f"[WARN] Failed to check IP via new tab: {e}")
- try:
- if self.page.tabs_count > 1:
- tab.close()
- except:
- pass
- return "0.0.0.0"
- def _get_common_headers(self, with_auth=True) -> Dict[str, str]:
- # DrissionPage 浏览器会自动带上 Origin, Referer, User-Agent, Sec-CH-UA 等
- # 这里只需要补充业务特定的 Headers
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- lang = self.free_config.get("language", "en")
- route = f"{country}/{lang}/{mission}"
-
- h = {
- "accept": "application/json, text/plain, */*",
- "route": route
- }
-
- h["clientsource"] = self._get_client_source()
-
- if with_auth and self.jwt_token:
- h["authorize"] = self.jwt_token
-
- return h
- def _encrypt_password(self, password: str) -> str:
- ciphertext = self.public_key.encrypt(
- password.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None
- )
- )
- return base64.b64encode(ciphertext).decode()
- def _get_orange_source(self, email: str) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"{email};{timestamp}"
- return self._encrypt_password(payload)
- def _get_client_source(self) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"GA;{timestamp}Z"
- return self._encrypt_password(payload)
- def _query_earliest_slot(self, apt_config) -> Optional[str]:
- url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable"
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countrycode": self.free_config.get("country_code"),
- "vacCode": apt_config.get("vac_code"),
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "roleName": "Individual",
- "loginUser": self.config.account.username,
- "payCode": ""
- }
- headers = self._get_common_headers(with_auth=True)
- resp = self._perform_request("POST", url, headers=headers, json_data=data, retry_count=2)
-
- if "WaitList" in resp.text:
- return "WaitList"
- j = resp.json()
- if j.get("earliestSlotLists"):
- raw_date = j["earliestSlotLists"][0]["date"]
- return to_yyyymmdd(raw_date, "%m/%d/%Y %H:%M:%S")
- return ""
- def _fetch_configurations(self, apt_config: Dict[str, Any]):
- if not self.center_conf:
- self.center_conf = self._query_center()
- vac_code = apt_config.get("vac_code")
- category_code = apt_config.get("category_code")
-
- if category_code not in self.category_conf:
- visa_categories = self._query_visa_category(vac_code)
- found = False
- for vc in visa_categories:
- if vc.get("code") == category_code:
- self.category_conf[category_code] = vc
- found = True
- break
- if not found:
- self._log(f"WARN: Category {category_code} not found")
- sub_category_code = apt_config.get("subcategory_code")
- if sub_category_code not in self.subcategory_conf:
- visa_subcategories = self._query_visa_sub_category(vac_code, category_code)
- found = False
- for svc in visa_subcategories:
- if svc.get("code") == sub_category_code:
- self.subcategory_conf[sub_category_code] = svc
- found = True
- break
- if not found:
- self._log(f"WARN: SubCategory {sub_category_code} not found")
- def _query_center(self) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_category(self, center_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_sub_category(self, center_code: str, category_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- enc_cat = urllib.parse.quote(category_code)
- url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
- def _read_otp_email(self, sent_at='', sender='VFS Global') -> str:
- # 保持原样,这部分使用云API读取邮件,不依赖本地网络库
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- subject_keywords = "One Time Password"
- body_keywords = "OTP"
- if not sent_at:
- now_utc = datetime.utcnow()
- sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- self._log(f"Waiting for OTP email...")
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email, sender, recipient, subject_keywords, body_keywords, sent_at, 300
- )
- if content_out:
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- return match.group(0)
- raise BizLogicError(message="OTP code not found")
- def _submit_login_otp(self, old_cf_token: str, otp: str):
- self._log("Submitting Login OTP...")
-
- new_cf_token = self._refresh_turnstile_token()
- # ---------------------------
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission,
- "countrycode": country,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": new_cf_token,
- "otp": otp
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- resp_json = resp.json()
-
- if resp_json.get("accessToken"):
- self.jwt_token = resp_json["accessToken"]
- self._log("OTP Login successful.")
- return
- error_desc = resp_json.get("description", resp.text)
- raise PermissionDeniedError(message=f"OTP Login Failed: {error_desc}")
-
- def _refresh_turnstile_token(self) -> str:
- """
- 强制刷新 Cloudflare Turnstile 并获取新 Token (集成 CloudflareBypasser 版)
- """
- self._log("Refreshing Cloudflare Turnstile token...")
-
- js_reset = """
- try {
- var input = document.querySelector('input[name="cf-turnstile-response"]');
- if (input) input.value = "";
- window.turnstile.reset();
- } catch(e) {
- console.log("Turnstile reset error:", e);
- }
- """
- self.page.run_js(js_reset)
-
- cf_bypasser = CloudflareBypasser(self.page, log=self.config.debug)
- for i in range(60):
- time.sleep(0.5)
-
- try:
- ele = self.page.ele('@name=cf-turnstile-response', timeout=1)
- if ele and ele.value:
- self._log("Turnstile token refreshed successfully.")
- return ele.value
- except:
- pass
-
- if i > 4:
- self._handle_cookie_banner()
-
- try:
- use_dfs = False
- cf_bypasser.click_verification_button(use_dfs)
- except Exception as e:
- # 点击过程报错不要中断主循环
- pass
-
- raise BizLogicError("Failed to refresh Cloudflare Turnstile token (Timeout)")
- def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult:
- """
- 执行完整的预约流程
- """
- self._log("Starting booking process...")
-
- user_email = user_inputs.get('email')
- user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com")
-
- res = VSBookResult()
- app_type = slot_info.apt_type
- from_date = slot_info.earliest_date.strftime("%Y-%m-%d") if slot_info.earliest_date else datetime.now().strftime("%Y-%m-%d")
-
- apt_config = self.free_config.get("apt_configs", {}).get(app_type.routing_key)
-
- if not apt_config:
- raise NotFoundError(message="Book: Config missing for this routing key.")
- self._fetch_configurations(apt_config)
- sub_cc = apt_config.get("subcategory_code")
- sub_conf = self.subcategory_conf.get(sub_cc, {})
- ocr_enabled = sub_conf.get("isOCREnable", False)
- if ocr_enabled:
- self._log("OCR Enabled, uploading documents...")
- upload_res = self._upload_applicant_documents(apt_config, user_inputs)
- user_inputs["applicant_image"] = upload_res.get("passportImageFilename")
- user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes")
- user_inputs["guid"] = upload_res.get("uploadDocumentGUID")
- enable_reference_number = sub_conf.get("enableReferenceNumber", False)
- final_urn = None
- is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist)
-
- if not self.booking_wait_applied:
- self._log("pre-booking wait: sleeping 20s before booking to avoid risk control")
- time.sleep(20)
- self.booking_wait_applied = True
-
- MAX_RETRY = 2
- for i in range(MAX_RETRY):
- try:
- final_urn = self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number)
- break
- except BizLogicError as e:
- err_msg = str(e)
- self._log(f"Add Applicant retry {i+1}/{MAX_RETRY}: {err_msg}")
- if 'Capping has exceeded' in err_msg:
- raise e
- time.sleep(10.0)
-
- if not final_urn:
- raise BizLogicError(message="Failed to add primary applicant (Slot likely taken or API error)")
- self._log(f"Applicant Added. URN: {final_urn}")
- otp_enabled = sub_conf.get("isApplicantOTPEnabled", False)
- if otp_enabled:
- self._log("Applicant OTP Required.")
- now_utc = datetime.utcnow()
- sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- if not self._applicant_otp_send(apt_config, final_urn):
- raise BizLogicError(message='Applicant OTP send failed')
-
- otp_code = self._read_otp_email(sent_at=sent_at)
- if not self._applicant_otp_verify(apt_config, final_urn, otp_code):
- raise BizLogicError(message='Applicant OTP verify failed')
- if is_waitlist:
- if self._confirm_waitlist(apt_config, final_urn):
- res.success = True
- res.urn = final_urn
- res.account = self.config.account.username
- self._log("Waitlist confirmed.")
- return res
- raise BizLogicError(message='Confirm waitlist failed')
- expected_start = user_inputs.get("expected_start_date", "")
- expected_end = user_inputs.get("expected_end_date", "")
-
- months = self._get_filtered_covered_months(expected_start, expected_end, from_date)
- self._log(f"Scanning months: {months} (Start looking from: {from_date})")
-
- selected_slot_id = ""
- selected_slot_date = ""
- selected_slot_time_range = ""
-
- all_ads = set()
- forbidden_dates = set()
- found_slot = False
-
- for m_str in months:
- self._log(f"Checking calendar for {m_str}...")
- ads = self._query_slot_calendar(apt_config, final_urn, m_str)
-
- new_ads = [d for d in ads if d not in all_ads]
- all_ads.update(new_ads)
-
- for _ in range(3):
- avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates]
- sel_dates = self._filter_dates(avail_candidates, expected_start, expected_end)
-
- if not sel_dates:
- break
-
- tmp_date = sel_dates[0]
- forbidden_dates.add(tmp_date)
- if not self._saveuseractionaudit(apt_config, final_urn, tmp_date):
- self._log(f"Audit failed for {tmp_date}, skipping...")
- time.sleep(1)
- continue
-
- ats = self._query_slot_time(apt_config, final_urn, tmp_date)
- if not ats:
- self._log(f"No timeslots for {tmp_date}")
- continue
-
- sel_tm = random.choice(ats)
-
- selected_slot_id = sel_tm.get("allocationId")
- selected_slot_date = tmp_date
- selected_slot_time_range = sel_tm.get("slot")
-
- found_slot = True
- break
-
- if found_slot:
- break
-
- if not found_slot:
- self._log("No valid slots found after scanning.")
- res.success = False
- return res
- self._log(f"Slot Selected: {selected_slot_date} {selected_slot_time_range} (ID: {selected_slot_id})")
- self._submit_no_addition_service(final_urn)
- amount, currency = self._query_fee(apt_config, final_urn)
-
- self._log("Submitting schedule...")
- schedule_res = self._schedule(apt_config, final_urn, amount, currency, selected_slot_id)
-
- if not schedule_res.get("IsAppointmentBooked"):
- self._log(f"Booking failed: {schedule_res}")
- res.success = False
- return res
-
- res.success = True
- res.account = self.config.account.username
- res.book_date = selected_slot_date
- res.book_time = selected_slot_time_range
- res.urn = final_urn
- res.fee_amount = int(amount * 100)
- res.fee_currency = currency
-
- if schedule_res.get("IsPaymentRequired", False):
- payload = schedule_res.get("payLoad", "")
- if payload:
- self._log("Processing payment link...")
- payment_url = self._pay_request(payload)
- if payment_url:
- res.payment_link = payment_url
-
- return res
- def _get_application(self):
- url = 'https://lift-api.vfsglobal.com/appointment/application'
- headers = self._get_common_headers(with_auth=True)
- data = {
- 'countryCode': self.free_config.get("country_code"),
- 'missionCode': self.free_config.get("mission_code"),
- 'loginUser': self.config.account.username,
- 'languageCode': 'en-US',
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json()
- def _upload_applicant_documents(self, apt_config, user_inputs) -> Dict:
- """上传图片:先下载外部图片,再通过浏览器上传到 VFS"""
- import requests as standard_requests # 使用标准库下载外部资源
-
- url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument"
- passport_url = user_inputs.get("passport_image_url")
- if not passport_url:
- raise NotFoundError(message="Missing passport_image_url")
- try:
- img_resp = standard_requests.get(passport_url, timeout=30)
- if img_resp.status_code != 200:
- raise BizLogicError(message=f"Failed to download passport image: {img_resp.status_code}")
- b64_str = base64.b64encode(img_resp.content).decode('utf-8')
- except Exception as e:
- raise BizLogicError(message=f"Image download error: {e}")
-
- headers = self._get_common_headers(with_auth=True)
-
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "languageCode": "en-US",
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fileBytes": b64_str,
- "selfiImageFileBytes": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- result = resp.json()
-
- result["passportImageFilename"] = "passport_img.jpg"
- result["passportImageFileBytes"] = b64_str
- return result
-
- def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any],
- is_waitlist: bool, ocr_enabled: bool, enable_ref: bool) -> str:
- """构造申请人 payload 并提交"""
- url = "https://lift-api.vfsglobal.com/appointment/applicants"
- headers = self._get_common_headers(with_auth=True)
- gender_str = str(user_inputs.get("gender", "")).lower()
- gender_code = 1 if gender_str == "male" else 2
- raw_dial = user_inputs.get("phone_country_code", "86")
- dial_code = str(raw_dial)
- # 日期格式转换 YYYY-MM-DD -> DD/MM/YYYY
- def _to_ddmmyyyy(d_str):
- try:
- return datetime.strptime(str(d_str), "%Y-%m-%d").strftime("%d/%m/%Y")
- except:
- return str(d_str)
- dob = _to_ddmmyyyy(user_inputs.get("birthday", ""))
- ppt_exp = _to_ddmmyyyy(user_inputs.get("passport_expiry_date", ""))
- applicant = {
- "urn": "",
- "arn": "",
- "loginUser": self.config.account.username,
- "firstName": str(user_inputs.get("first_name", "")).strip().upper(),
- "middleName": "",
- "lastName": str(user_inputs.get("last_name", "")).strip().upper(),
- "employerFirstName": "",
- "employerLastName": "",
- "salutation": "",
- "Subclasscode": None,
- "VisaToken": None,
- "centerClassCode": None,
- "dateOfApplication": None,
- "selectedSubvisaCategory": None,
- "gender": gender_code,
- "contactNumber": str(user_inputs.get("phone", "")).strip().lstrip("0"),
- "dialCode": dial_code,
- "employerContactNumber": "",
- "employerDialCode": "",
- "emailId": str(user_inputs.get("alias_email", "")).strip().upper(),
- "employerEmailId": "",
- "passportNumber": str(user_inputs.get("passport_no", "")).strip().upper(),
- "confirmPassportNumber": "",
- "passportExpirtyDate": ppt_exp,
- "dateOfBirth": dob,
- "nationalId": None,
- "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))),
- "state": user_inputs.get("state"),
- "city": user_inputs.get("city"),
- "addressline1": user_inputs.get("addressline1"),
- "addressline2": user_inputs.get("addressline2"),
- "pincode": None,
- "isEndorsedChild": False,
- "applicantType": 0,
- "vlnNumber": None,
- "applicantGroupId": 0,
- "parentPassportNumber": "",
- "parentPassportExpiry": "",
- "dateOfDeparture": None,
- "entryType": "",
- "eoiVisaType": "",
- "passportType": "",
- "vfsReferenceNumber": "",
- "familyReunificationCerificateNumber": "",
- "PVRequestRefNumber": "",
- "PVStatus": "",
- "PVStatusDescription": "",
- "PVCanAllowRetry": True,
- "PVisVerified": False,
- "eefRegistrationNumber": "",
- "isAutoRefresh": True,
- "helloVerifyNumber": "",
- "OfflineCClink": "",
- "idenfystatuscheck": False,
- "vafStatus": None,
- "SpecialAssistance": "",
- "AdditionalRefNo": None,
- "juridictionCode": "",
- "canInitiateVAF": False,
- "canEditVAF": False,
- "canDeleteVAF": False,
- "canDownloadVAF": False,
- "Retryleft": "",
- # 这里的 IP 应该已经在 create_session 时获取到了
- "ipAddress": self.real_ip
- }
- if enable_ref:
- applicant["referenceNumber"] = str(user_inputs.get("cover_letter_id", "")).strip()
- else:
- applicant["referenceNumber"] = None
- if ocr_enabled:
- applicant["applicantImage"] = str(user_inputs.get("applicant_image", ""))
- applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", ""))
- applicant["GUID"] = str(user_inputs.get("guid", ""))
- payload = {
- "countryCode": self.free_config.get("country_code"),
- "missionCode": self.free_config.get("mission_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "applicantList": [applicant],
- "languageCode": "en-US",
- "isWaitlist": is_waitlist,
- "isEdit": False,
- "feeEntryTypeCode": None, "feeExemptionTypeCode": None,
- "feeExemptionDetailsCode": None, "juridictionCode": None, "regionCode": None
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=payload)
- urn = resp.json().get("urn")
- if not urn:
- err_msg = resp.json().get('error')
- raise BizLogicError(message=str(err_msg))
- return urn
-
- def _applicant_otp_send(self, apt_config, urn) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": "",
- "otpAction": "GENERATE",
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPGenerated", False)
- def _applicant_otp_verify(self, apt_config, urn, otp) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- # VFS 这里的 header 有时需要 datacenter,原代码有就加上
- headers["datacenter"] = "GERMANY"
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": otp,
- "otpAction": "VALIDATE",
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPValidated", False)
-
- def _query_slot_calendar(self, apt_config, urn, from_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/calendar"
- headers = self._get_common_headers(with_auth=True)
-
- # 将 YYYY-MM-DD 转为 DD/MM/YYYY 用于 API
- dt_m = datetime.strptime(from_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fromDate": converted_date,
- "urn": urn,
- "payCode": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- calendars = resp.json().get("calendars")
- ads_out = []
- if calendars:
- for item in calendars:
- # API 返回可能是 MM/DD/YYYY 或 DD/MM/YYYY,VFS 比较乱
- # 通常是 MM/DD/YYYY
- raw = item.get("date")
- ads_out.append(to_yyyymmdd(raw, "%m/%d/%Y"))
- return ads_out
-
- def _query_slot_time(self, apt_config, urn, slot_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/timeslot"
- headers = self._get_common_headers(with_auth=True)
-
- dt_m = datetime.strptime(slot_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "slotDate": converted_date,
- "urn": urn
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("slots", [])
- def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit"
- headers = self._get_common_headers(with_auth=True)
-
- dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "firstEarliestSlotDate": dt.strftime("%d/%m/%Y"),
- "action": "schedule",
- "ipAddress": self.real_ip,
- "eadAppointmentDetail": dt.strftime("%Y-%m-%dT%H:%M:%S")
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isSavedSuccess", False)
-
- def _submit_no_addition_service(self, urn):
- url = "https://lift-api.vfsglobal.com/vas/mapvas"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "urn": urn,
- "applicants": []
- }
- self._perform_request("POST", url, headers=headers, json_data=data)
- def _query_fee(self, apt_config, urn) -> Tuple[float, str]:
- url = "https://lift-api.vfsglobal.com/appointment/fees"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "languageCode": "en-US"
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- j = resp.json()
- total = j.get("totalamount", 0.0)
- currency = "EUR"
- if j.get("feeDetails"):
- currency = j["feeDetails"][0].get("currency", "EUR")
- return total, currency
- def _schedule(self, apt_config, urn, amount, currency, slot_id) -> Dict:
- url = "https://lift-api.vfsglobal.com/appointment/schedule"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "paymentdetails": {
- "paymentmode": "Online",
- "RequestRefNo": "",
- "clientId": "",
- "merchantId": "",
- "amount": amount,
- "currency": currency
- },
- "allocationId": str(slot_id),
- "CanVFSReachoutToApplicant": True
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json()
- def _pay_request(self, payload) -> str:
- """
- 解析支付重定向 URL (DrissionPage 新标签页版)
- """
- start_url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}"
- final_url = ""
-
- try:
- self._log("Resolving payment redirect...")
- # 使用新标签页去跑,以免当前会话状态丢失
- pay_tab = self.page.new_tab(start_url)
-
- # 等待跳转完成 (通常会跳到 Stripe, WorldPay 或其他支付网关)
- # 等待直到 URL 不再是 PayRequest
- pay_tab.wait.url_change(start_url, timeout=15)
-
- final_url = pay_tab.url
- self._log(f"Payment URL resolved: {final_url}")
-
- pay_tab.close()
-
- except Exception as e:
- self._log(f"[WARN] Failed to resolve payment URL: {e}")
- try:
- pay_tab.close()
- except:
- pass
-
- return final_url
- def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist"
- headers = self._get_common_headers(with_auth=True)
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "CanVFSReachoutToApplicant": True
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isConfirmed", False)
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- if not start_str or not end_str:
- return dates
- valid_dates = []
- try:
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
- except:
- return dates
- def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]:
- fmt = "%Y-%m-%d"
- try:
- dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now()
- dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1)
- try:
- dt_from = datetime.strptime(from_date, fmt)
- except:
- dt_from = datetime.now()
- except:
- return []
- dt_start = dt_start.replace(day=1)
- dt_end = dt_end.replace(day=1)
- dt_from = dt_from.replace(day=1)
- curr = max(dt_start, dt_from)
-
- months = []
- while curr <= dt_end:
- months.append(curr.strftime(fmt))
- if curr.month == 12:
- curr = curr.replace(year=curr.year + 1, month=1)
- else:
- curr = curr.replace(month=curr.month + 1)
- return months
-
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- # 1. 关闭浏览器
- if self.page:
- try:
- self.page.quit() # 这会关闭 Chrome 进程
- except Exception:
- pass # 忽略已关闭的错误
- self.page = None
-
- # 2. 删除文件
- # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- # 如果删除失败(通常是Windows文件占用),重试
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
-
- # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
-
- # 3. [新增] 关闭代理隧道
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
|