# plugins/vfs_global_plugin.py import time import json import random import base64 import re import urllib.parse from datetime import datetime from typing import Dict, Any, Optional, List, Tuple, Callable from curl_cffi import requests, const # 加密库 from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, DateAvailability, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi # ----------------- 静态常量与辅助数据 ----------------- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2 GQIDAQAB -----END PUBLIC KEY-----""" COUNTRY_MAP = { "afghanistan": "AFG", "albania": "ALB", "algeria": "DZA", "andorra": "AND", "angola": "AGO", "antigua and barbuda": "ATG", "argentina": "ARG", "armenia": "ARM", "australia": "AUS", "austria": "AUT", "azerbaijan": "AZE", "bahamas": "BHS", "bahrain": "BHR", "bangladesh": "BGD", "barbados": "BRB", "belarus": "BLR", "belgium": "BEL", "belize": "BLZ", "benin": "BEN", "bhutan": "BTN", "bolivia": "BOL", "bosnia and herzegovina": "BIH", "botswana": "BWA", "brazil": "BRA", "brunei": "BRN", "bulgaria": "BGR", "burkina faso": "BFA", "burundi": "BDI", "cabo verde": "CPV", "cambodia": "KHM", "cameroon": "CMR", "canada": "CAN", "central african republic": "CAF", "chad": "TCD", "chile": "CHL", "china": "CHN", "colombia": "COL", "comoros": "COM", "congo (brazzaville)": "COG", "congo (kinshasa)": "COD", "costa rica": "CRI", "croatia": "HRV", "cuba": "CUB", "cyprus": "CYP", "czech republic": "CZE", "denmark": "DNK", "djibouti": "DJI", "dominica": "DMA", "dominican republic": "DOM", "ecuador": "ECU", "egypt": "EGY", "el salvador": "SLV", "equatorial guinea": "GNQ", "eritrea": "ERI", "estonia": "EST", "eswatini": "SWZ", "ethiopia": "ETH", "fiji": "FJI", "finland": "FIN", "france": "FRA", "gabon": "GAB", "gambia": "GMB", "georgia": "GEO", "germany": "DEU", "ghana": "GHA", "greece": "GRC", "grenada": "GRD", "guatemala": "GTM", "guinea": "GIN", "guinea-bissau": "GNB", "guyana": "GUY", "haiti": "HTI", "honduras": "HND", "hungary": "HUN", "iceland": "ISL", "india": "IND", "indonesia": "IDN", "iran": "IRN", "iraq": "IRQ", "ireland": "IRL", "israel": "ISR", "italy": "ITA", "jamaica": "JAM", "japan": "JPN", "jordan": "JOR", "kazakhstan": "KAZ", "kenya": "KEN", "kiribati": "KIR", "korea, north": "PRK", "korea, south": "KOR", "kuwait": "KWT", "kyrgyzstan": "KGZ", "laos": "LAO", "latvia": "LVA", "lebanon": "LBN", "lesotho": "LSO", "liberia": "LBR", "libya": "LBY", "liechtenstein": "LIE", "lithuania": "LTU", "luxembourg": "LUX", "madagascar": "MDG", "malawi": "MWI", "malaysia": "MYS", "maldives": "MDV", "mali": "MLI", "malta": "MLT", "marshall islands": "MHL", "mauritania": "MRT", "mauritius": "MUS", "mexico": "MEX", "micronesia": "FSM", "moldova": "MDA", "monaco": "MCO", "mongolia": "MNG", "montenegro": "MNE", "morocco": "MAR", "mozambique": "MOZ", "myanmar": "MMR", "namibia": "NAM", "nauru": "NRU", "nepal": "NPL", "netherlands": "NLD", "new zealand": "NZL", "nicaragua": "NIC", "niger": "NER", "nigeria": "NGA", "north macedonia": "MKD", "norway": "NOR", "oman": "OMN", "pakistan": "PAK", "palau": "PLW", "panama": "PAN", "papua new guinea": "PNG", "paraguay": "PRY", "peru": "PER", "philippines": "PHL", "poland": "POL", "portugal": "PRT", "qatar": "QAT", "romania": "ROU", "russia": "RUS", "rwanda": "RWA", "saudi arabia": "SAU", "senegal": "SEN", "serbia": "SRB", "seychelles": "SYC", "sierra leone": "SLE", "singapore": "SGP", "slovakia": "SVK", "slovenia": "SVN", "solomon islands": "SLB", "somalia": "SOM", "south africa": "ZAF", "spain": "ESP", "sri lanka": "LKA", "sudan": "SDN", "suriname": "SUR", "sweden": "SWE", "switzerland": "CHE", "syria": "SYR", "tajikistan": "TJK", "tanzania": "TZA", "thailand": "THA", "timor-leste": "TLS", "togo": "TGO", "tonga": "TON", "tunisia": "TUN", "turkey": "TUR", "turkmenistan": "TKM", "uganda": "UGA", "ukraine": "UKR", "united arab emirates": "ARE", "united kingdom": "GBR", "united states": "USA", "uruguay": "URY", "uzbekistan": "UZB", "vanuatu": "VUT", "venezuela": "VEN", "vietnam": "VNM", "yemen": "YEM", "zambia": "ZMB", "zimbabwe": "ZWE" } def get_country_iso3(name: str) -> str: return COUNTRY_MAP.get(name.lower(), "CHN") def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str: """ 将邮箱域名替换为指定域名(默认 gmail-app.com) """ if "@" not in email: raise ValueError(f"Invalid email: {email}") local_part, _ = email.rsplit("@", 1) return f"{local_part}@{new_domain}" def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"): # 转换日期到YYYY-MM-DD 固定格式 dt = datetime.strptime(data_str, date_str_format) return dt.strftime("%Y-%m-%d") class VfsPlugin(IVSPlg): def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.logger = None self.session: Optional[requests.Session] = None self.jwt_token: str = "" self.user_agent: str = "" self.real_ip: str = "" self.is_healthy: bool = True # 缓存配置 self.center_conf = None self.category_conf: Dict = {} self.subcategory_conf: Dict = {} # 加载公钥 self.public_key = serialization.load_pem_public_key( VFS_PUBLIC_KEY_PEM.encode(), backend=default_backend() ) self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self) -> None: # 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) # 获取真实IP self.real_ip = self._get_realnetwork_ip() # 1. Cloudflare Turnstile cf_token = self._handle_cloudflare_challenge() # 2. 准备参数 email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) mission_code = self.free_config.get("mission_code", "") country_code = self.free_config.get("country_code", "") client_src = self._get_client_source() orange_src = self._get_orange_source(email) url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src, "content-type": "application/x-www-form-urlencoded" }) data = { "username": email, "password": enc_password, "missioncode": mission_code, "countrycode": country_code, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": cf_token } # 3. 发送登录请求 resp = self._perform_request("POST", url, headers=headers, data=data) resp_json = resp.json() # 分支 1: 登录直接成功,获取到 Token if resp_json.get('accessToken'): self.jwt_token = resp_json["accessToken"] self._log("Login successful, JWT obtained.") # 分支 2: 需要 OTP 验证 elif resp_json.get("enableOTPAuthentication"): self._log("Login requires OTP.") otp = self._read_otp_email() # 提交 OTP,如果失败该函数内部应抛出异常 self._submit_login_otp(None, otp) # 分支 3: 异常情况(既无 Token 也无 OTP) else: # 在分支内部抛出异常,包含响应内容方便调试 raise BizLogicError(message=f"Login failed: No access token or OTP flow. Response: {resp_json}") self.session_create_time = time.time() self._log("Session created successfully.") def query(self, apt_type: AppointmentType) -> VSQueryResult: """查询可预约 Slot""" result = VSQueryResult() apt_config = self.free_config.get("app_configs", {}).get(apt_type.routing_key) self._fetch_configurations(apt_config) earliest_date = self._query_earliest_slot(apt_config) result.success = False result.availability_status = AvailabilityStatus.NoneAvailable result.apt_type = apt_type if earliest_date: result.success = True if "WaitList" in earliest_date: result.availability_status = AvailabilityStatus.Waitlist else: result.availability_status = AvailabilityStatus.Available result.earliest_date = earliest_date result.availability = [ DateAvailability( date=earliest_date, times=[], ) ] return result def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult: """ 执行完整的预约流程,包含:上传文档 -> 添加申请人 -> OTP -> 选时间 -> 锁定 -> 支付 """ user_email = user_inputs.get('email') user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com") res = VSBookResult() app_type = slot_info.apt_type from_date = slot_info.earliest_date if slot_info.earliest_date else datetime.now().strftime("%Y-%m-%d") apt_config = self.free_config.get("app_configs", {}).get(app_type.routing_key) if not apt_config: raise NotFoundError(message="Book: Config missing.") self._fetch_configurations(apt_config) sub_cc = apt_config.get("subcategory_code") sub_conf = self.subcategory_conf.get(sub_cc, {}) # OCR 识别 / 文档上传 ocr_enabled = sub_conf.get("isOCREnable", False) if ocr_enabled: self._log("OCR Enabled, uploading documents...") upload_res = self._upload_applicant_documents(apt_config, user_inputs, upload_res) user_inputs["applicant_image"] = upload_res.get("passportImageFilename") user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes") # Base64 user_inputs["guid"] = upload_res.get("uploadDocumentGUID") # 需要提供申请号 (Cover Letter) enable_reference_number = sub_conf.get("enableReferenceNumber", False) # 添加申请人 (核心步骤 1) final_urn = None is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist) add_primary_retry = 0 MAX_RETRY = 6 while add_primary_retry < MAX_RETRY: try: final_urn = self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number) if not final_urn: raise NotFoundError(message="URN not found") break except Exception as e: self._log(f"Add Applicant retry {add_primary_retry}...") time.sleep(10) add_primary_retry += 1 if not final_urn: raise BizLogicError(message="Failed to add primary applicant (Slot likely taken)") self._log(f"Applicant Added. URN: {final_urn}") # 申请人 OTP 验证 (核心步骤 2) otp_enabled = sub_conf.get("isApplicantOTPEnabled", False) if otp_enabled: self._log("Applicant OTP Required.") if not self._applicant_otp_send(apt_config, final_urn): raise BizLogicError(message='applicant otp send failed') # 复用之前的读邮件逻辑 otp_code = self._read_otp_email() if not self._applicant_otp_verify(apt_config, final_urn, otp_code): raise BizLogicError(message='applicant otp verify failed') # 如果是 Waitlist 模式,直接确认并返回 if is_waitlist: if self._confirm_waitlist(apt_config, final_urn): res.success = True res.account = self.config.account.username res.urn = final_urn return res raise BizLogicError(message='confirm waitlist failed') # 规则引擎与日期筛选 (核心步骤 3) expected_start = user_inputs.get("expected_start_date", "") expected_end = user_inputs.get("expected_end_date", "") # 计算需要扫描的月份, 如果 expected_start/end 为空,默认使用 from_date 所在月 months = self._get_filtered_covered_months(expected_start, expected_end, from_date) self._log(f"Scanning months: {months} (From: {from_date})") selected_slot_id = "" selected_slot_date = "" selected_slot_time_range = "" # 记录所有有号日期,避免重复处理 all_ads = set() forbidden_dates = set() found_slot = False # 遍历月份寻找 Slot for m_str in months: ads = self._query_slot_calendar(apt_config, final_urn, m_str) # 过滤已知的 slots new_ads = [d for d in ads if d not in all_ads] all_ads.update(new_ads) # 尝试 3 次选择 for _ in range(3): # 排除 forbidden avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates] # 规则筛选 sel_dates = self._filter_dates(avail_candidates, expected_start, expected_end) print(f'avail_candidates={avail_candidates}, sel_dates={sel_dates}') if not sel_dates: break tmp_date = sel_dates[0] forbidden_dates.add(tmp_date) # 审计日志 if not self._saveuseractionaudit(apt_config, final_urn, tmp_date): time.sleep(3) continue # 查询具体时间 ats = self._query_slot_time(apt_config, final_urn, tmp_date) if not ats: time.sleep(3) continue # 随机选择一个时间段 sel_tm = random.choice(ats) selected_slot_id = sel_tm.get("allocationId") selected_slot_date = tmp_date selected_slot_time_range = sel_tm.get("slot") found_slot = True break if found_slot: break if not found_slot: self._log("No valid slots found.") res.success = False return res self._log(f"Slot Selected: {selected_slot_date} {selected_slot_time_range} (ID: {selected_slot_id})") # 服务、费用、最终预约 (核心步骤 4) self._submit_no_addition_service(final_urn) amount, currency = self._query_fee(apt_config, final_urn) schedule_res = self._schedule(apt_config, final_urn, amount, currency, selected_slot_id) if not schedule_res.get("IsAppointmentBooked"): self._log(f"IsAppointmentBooked is false") res.success = False return res # 构造返回结果 res.success = True res.account = self.config.account.username res.book_date = selected_slot_date res.book_time = selected_slot_time_range res.urn = final_urn res.fee_amount = int(amount * 100) res.fee_currency = currency # 处理支付链接 if schedule_res.get("IsPaymentRequired", False): payload = schedule_res.get("payLoad", "") payment_url = self._pay_request(payload) if payment_url: res.payment_link = payment_url # 保存 Session saved_session = self._save_http_session(payment_url) if saved_session: res.session_id = saved_session['session_id'] return res def _log(self, message): if self.logger: self.logger(f'[VfsPlugin] [{self.group_id}] {message}') else: print(f'[VfsPlugin] [{self.group_id}] {message}') def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" return proxy_url def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]: """ 计算需要查询的月份列表,格式 YYYY-MM-DD (每月1号) """ fmt = "%Y-%m-%d" # 默认值处理 try: dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now() dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1) try: dt_from = datetime.strptime(from_date, fmt) except: dt_from = datetime.now() except: return [] # 归一化到月初 dt_start = dt_start.replace(day=1) dt_end = dt_end.replace(day=1) dt_from = dt_from.replace(day=1) # 起始点取 max(start, from) curr = max(dt_start, dt_from) months = [] while curr <= dt_end: months.append(curr.strftime(fmt)) # 下个月 if curr.month == 12: curr = curr.replace(year=curr.year + 1, month=1) else: curr = curr.replace(month=curr.month + 1) return months def _get_realnetwork_ip(self): url = "https://api.ipify.org/?format=json" headers = { 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', } resp = self._perform_request('GET', url, headers=headers) return resp.json()['ip'] def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool: """ 确认加入候补名单 (对应 C++ VFSApi::confirm_waitlist) """ url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "CanVFSReachoutToApplicant": True } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isConfirmed") def _upload_applicant_documents(self, apt_config, user_inputs) -> Dict: """上传护照图片""" url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument" passport_url = user_inputs.get("passport_image_url") if not passport_url: raise NotFoundError(message="Missing passport_image_url") img_resp = requests.get(passport_url, timeout=30) if img_resp.status_code != 200: raise BizLogicError(message="Failed to download passport image") b64_str = base64.b64encode(img_resp.content).decode('utf-8') headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missioncode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "languageCode": "en-US", "visaCategoryCode": apt_config.get("subcategory_code"), "fileBytes": b64_str, "selfiImageFileBytes": "" } resp = self._perform_request("POST", url, headers=headers, json_data=data) result = resp.json() result["passportImageFilename"] = "passport_img.jpg" result["passportImageFileBytes"] = b64_str return result def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any], is_waitlist: bool, ocr_enabled: bool, enable_ref: bool) -> str: """ 构造复杂的申请人 JSON payload 并提交 """ url = "https://lift-api.vfsglobal.com/appointment/applicants" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" #male/Male -> 1, 否则 -> 2 gender_str = str(user_inputs.get("gender", "")).lower() gender_code = 1 if gender_str == "male" else 2 raw_dial = user_inputs.get("phone_country_code", "86") dial_code = str(raw_dial) # to_ddmmyyyy (YYYY-MM-DD -> DD/MM/YYYY) def _to_ddmmyyyy(d_str): try: # 假设输入是 YYYY-MM-DD return datetime.strptime(d_str, "%Y-%m-%d").strftime("%d/%m/%Y") except: return d_str # 原样返回 dob = _to_ddmmyyyy(str(user_inputs.get("birthday", ""))) ppt_exp = _to_ddmmyyyy(str(user_inputs.get("passport_expiry_date", ""))) # --- 构造单个 Applicant 对象 --- applicant = { "urn": "", "arn": "", "loginUser": self.config.account.username, # 基本信息 (全部大写) "firstName": str(user_inputs.get("first_name", "")).upper(), "middleName": "", "lastName": str(user_inputs.get("last_name", "")).upper(), "employerFirstName": "", "employerLastName": "", "salutation": "", "gender": gender_code, # 联系信息 "contactNumber": str(user_inputs.get("phone", "")), "dialCode": dial_code, "employerContactNumber": "", "employerDialCode": "", "emailId": str(user_inputs.get("alias_email", "")).upper(), "employerEmailId": "", # 证件信息 "passportNumber": str(user_inputs.get("passport_no", "")).upper(), "confirmPassportNumber": "", "passportExpirtyDate": ppt_exp, "dateOfBirth": dob, "nationalId": None, # 国籍 (使用全局辅助函数 get_country_iso3) "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))), # 地址与其它 (大部分为空) "state": None, "city": None, "addressline1": None, "addressline2": None, "pincode": None, "isEndorsedChild": False, "applicantType": 0, "vlnNumber": None, "applicantGroupId": 0, "parentPassportNumber": "", "parentPassportExpiry": "", "dateOfDeparture": None, "entryType": "", "eoiVisaType": "", "passportType": "", "vfsReferenceNumber": "", "familyReunificationCerificateNumber": "", "PVRequestRefNumber": "", "PVStatus": "", "PVStatusDescription": "", "PVCanAllowRetry": True, "PVisVerified": False, "eefRegistrationNumber": "", "isAutoRefresh": True, "helloVerifyNumber": "", "OfflineCClink": "", "idenfystatuscheck": False, "vafStatus": None, "SpecialAssistance": "", "AdditionalRefNo": None, "juridictionCode": "", "canInitiateVAF": False, "canEditVAF": False, "canDeleteVAF": False, "canDownloadVAF": False, "Retryleft": "", # 真实 IP 注入 "ipAddress": self.real_ip } # --- 处理 Reference Number (Cover Letter) --- if enable_ref: applicant["referenceNumber"] = str(user_inputs.get("cover_letter", "")) else: applicant["referenceNumber"] = None # --- 处理 OCR 数据 --- if ocr_enabled: applicant["applicantImage"] = str(user_inputs.get("applicant_image", "")) applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", "")) applicant["GUID"] = str(user_inputs.get("guid", "")) # --- 构造最外层 Payload --- payload = { "countryCode": self.free_config.get("country_code"), "missionCode": self.free_config.get("mission_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "applicantList": [applicant], # 数组形式 "languageCode": "en-US", "isWaitlist": is_waitlist, "isEdit": False, "feeEntryTypeCode": None, "feeExemptionTypeCode": None, "feeExemptionDetailsCode": None, "juridictionCode": None, "regionCode": None } # --- 发送请求 --- resp = self._perform_request("POST", url, headers=headers, json_data=payload) # --- 处理响应 --- return resp.json()["urn"] def _applicant_otp_send(self, apt_config, urn) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "OTP": "", "otpAction": "GENERATE", "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isOTPGenerated") def _applicant_otp_verify(self, apt_config, urn, otp) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) headers["datacenter"] = "GERMANY" headers["content-type"] = "application/json;charset=UTF-8" data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "OTP": otp, "otpAction": "VALIDATE", "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isOTPValidated") def _query_slot_calendar(self, apt_config, urn, from_date) -> List: url = "https://lift-api.vfsglobal.com/appointment/calendar" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" dt_m = datetime.strptime(from_date, "%Y-%m-%d") converted_date = dt_m.strftime("%d/%m/%Y") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "fromDate": converted_date, "urn": urn, "payCode": "" } resp = self._perform_request("POST", url, headers=headers, json_data=data) calendars = resp.json().get("calendars") if calendars: ads_out = [] for item in calendars: # "MM/DD/YYYY" -> "YYYY-MM-DD" raw = item.get("date") ads_out.append(to_yyyymmdd(raw, "%m/%d/%Y")) return ads_out return [] def _query_slot_time(self, apt_config, urn, slot_date) -> List: url = "https://lift-api.vfsglobal.com/appointment/timeslot" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" dt_m = datetime.strptime(slot_date, "%Y-%m-%d") converted_date = dt_m.strftime("%d/%m/%Y") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategory_code"), "slotDate": converted_date, "urn": urn } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("slots") def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool: url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" # ISO format conversion dt = datetime.strptime(earliest_date, "%Y-%m-%d") data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "firstEarliestSlotDate": dt.strftime("%d/%m/%Y"), "action": "schedule", "ipAddress": self.real_ip, "eadAppointmentDetail": dt.strftime("%Y-%m-%dT%H:%M:%S") } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json().get("isSavedSuccess", False) def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ # --- 1. 发送 OPTIONS 请求 --- try: # OPTIONS 请求使用相同的 URL 和 headers (部分 header 如 content-length 会被自动处理) # 某些服务器反爬会检测 OPTIONS 请求 opt_headers = headers.copy() if headers else {} # 发送 OPTIONS self.session.request("OPTIONS", url, headers=opt_headers, timeout=10) # C++ 代码中并不检查 OPTIONS 的返回值,只检查执行是否成功 # 这里我们假设只要不抛出异常即可 except Exception as e: # 记录警告但不中断流程,防止仅仅是 OPTIONS 失败导致误判 self._log(f"OPTIONS request failed (non-fatal): {str(e)}") resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) if self.config.debug: self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}') if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _encrypt_password(self, password: str) -> str: ciphertext = self.public_key.encrypt( password.encode(), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return base64.b64encode(ciphertext).decode() def _get_orange_source(self, email: str) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"{email};{timestamp}" return self._encrypt_password(payload) def _get_client_source(self) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"GA;{timestamp}Z" return self._encrypt_password(payload) def _handle_cloudflare_challenge(self) -> str: """ 完整实现的 Cloudflare Turnstile 验证逻辑 """ mission = self.free_config.get("mission_code", "") country = self.free_config.get("country_code", "") if not mission or not country: raise NotFoundError(message="Missing mission_code or country_code in free_config") website_url = f"https://visa.vfsglobal.com/{country}/en/{mission}/login" # 构造代理字符串传给打码平台 (格式: http://user:pass@ip:port) proxy_str = self._get_proxy_url() # 2. 提交任务 self._log(f"Submitting Turnstile task for {website_url}...") task_id = VSCloudApi.Instance().create_task( command="AntiCloudflareTurnstileTask", args={ "proxy": proxy_str, "websiteUrl":website_url } ) result_data = VSCloudApi.Instance().get_task_result(task_id, timeout=60) task_result = result_data.get("result", {}) token = task_result.get("token") ua = task_result.get("userAgent") cookies_list = task_result.get("cookies", []) if not token: raise BizLogicError("Captcha solved but token is empty") # A. 设置 User-Agent if ua: self.user_agent = ua self.session.headers["User-Agent"] = ua # B. 设置 Cookies if cookies_list: self._log(f"Syncing {len(cookies_list)} cookies from Captcha solver...") for cookie in cookies_list: # 兼容不同的 cookie 格式 c_name = cookie.get("name") c_value = cookie.get("value") c_domain = cookie.get("domain", "") c_path = cookie.get("path", "/") if c_name and c_value: self.session.cookies.set( name=c_name, value=c_value, domain=c_domain, path=c_path ) self._log("Cloudflare challenge passed.") return token def _get_common_headers(self, with_auth=True) -> Dict[str, str]: mission = self.free_config.get("mission_code", "") country = self.free_config.get("country_code", "") lang = self.free_config.get("language", "en") route = f"{country}/{lang}/{mission}" h = { "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9", "origin": "https://visa.vfsglobal.com", "referer": "https://visa.vfsglobal.com/", "route": route } client_src = self._get_client_source() h["clientsource"] = client_src if with_auth and self.jwt_token: h["authorize"] = self.jwt_token return h def _query_earliest_slot(self, apt_config) -> Optional[str]: """ 查询最早 Slot(403 自动绕盾 + 重试) """ url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable" max_retries = self.free_config.get("slot_query_max_retries", 2) data = { "missioncode": self.free_config.get("mission_code"), "countrycode": self.free_config.get("country_code"), "vacCode": apt_config.get("vac_code"), "visaCategoryCode": apt_config.get("subcategory_code"), "roleName": "Individual", "loginUser": self.config.account.username, "payCode": "" } headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" for attempt in range(1, max_retries + 1): try: resp = self._perform_request( "POST", url, headers=headers, json_data=data ) break # ✅ 请求成功,跳出重试循环 except PermissionDeniedError: self._log(f"Earliest slot blocked (403), attempt {attempt}/{max_retries}") # 最后一次就不再绕盾了 if attempt >= max_retries: raise PermissionDeniedError() self._handle_cloudflare_challenge() self._log("Cloudflare bypass success, retrying...") continue # ====== 正常解析响应 ====== if "WaitList" in resp.text: return "WaitList" j = resp.json() if j.get("earliestSlotLists"): raw_date = j["earliestSlotLists"][0]["date"] return to_yyyymmdd(raw_date, "%m/%d/%Y %H:%M:%S") return "" def _fetch_configurations(self, apt_config: Dict[str, Any]): # 1. 获取所有中心配置 (query_center) if not self.center_conf: self.center_conf = self._query_center() # 2. 获取 Visa Category 配置 vac_code = apt_config.get("vac_code") category_code = apt_config.get("category_code") # 检查目标 category_code 是否已在缓存中 if category_code not in self.category_conf: visa_categories = [] visa_categories = self._query_visa_category(vac_code) found = False for vc in visa_categories: if vc.get("code") == category_code: self.category_conf[category_code] = vc found = True break # 如果没找到,可能配置错误,但 C++ 没报错,只继续 if not found: raise NotFoundError(message=f"{self.group_id} Category code {category_code} not found in VAC {vac_code}") # 3. 获取 Visa SubCategory 配置 sub_category_code = apt_config.get("subcategory_code") if sub_category_code not in self.subcategory_conf: visa_subcategories = self._query_visa_sub_category(vac_code, category_code) found = False for svc in visa_subcategories: if svc.get("code") == sub_category_code: self.subcategory_conf[sub_category_code] = svc found = True break if not found: raise NotFoundError(message=f"{self.group_id} SubCategory code {sub_category_code} not found") def _query_center(self) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _query_visa_category(self, center_code: str) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") enc_center = urllib.parse.quote(center_code) url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _query_visa_sub_category(self, center_code: str, category_code: str) -> List: mission = self.free_config.get("mission_code") country = self.free_config.get("country_code") enc_center = urllib.parse.quote(center_code) enc_cat = urllib.parse.quote(category_code) url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US" headers = self._get_common_headers(with_auth=False) resp = self._perform_request("GET", url, headers=headers) return resp.json() def _read_otp_email(self) -> str: """ 读取 OTP 邮件 """ master_email = "visafly666@gmail.com" recipient = self.config.account.username sender = "donotreply at vfshelpline.com" subject_keywords = "One Time Password" body_keywords = "OTP" now_utc = datetime.utcnow() formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S") self._log(f"Waiting for OTP email sent after {formatted_utc_time}...") # 3. 轮询查收 for i in range(12): content_out = VSCloudApi.Instance().fetch_mail_content( master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300 ) if content_out: match = re.search(r'\b\d{6}\b', content_out) if match: otp = match.group(0) self._log(f"OTP code found: {otp}") return otp time.sleep(5) raise NotFoundError(message="OTP email not found (timeout)") def _submit_login_otp(self, cf_token: str, otp: str): """ 提交 OTP 验证码进行登录 """ self._log("Submitting Login OTP...") # 1. 准备基础数据 email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) mission_code = self.free_config.get("mission_code", "") country_code = self.free_config.get("country_code", "") # 2. 生成加密 Source (每次请求时间戳不同,建议重新生成) client_src = self._get_client_source() orange_src = self._get_orange_source(email) # 3. 构造请求 url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src, "content-type": "application/x-www-form-urlencoded" }) # 为了稳健,如果传入为空,尝试重新获取。 if not cf_token: self._log("CF Token is empty, regenerating for OTP...") cf_token = self._handle_cloudflare_challenge() data = { "username": email, "password": enc_password, "missioncode": mission_code, "countrycode": country_code, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": cf_token, "otp": otp # 关键字段:OTP 验证码 } # 4. 发送请求 (POST form-urlencoded) resp = self._perform_request("POST", url, headers=headers, data=data) resp_json = resp.json() if resp_json["accessToken"]: self.jwt_token = resp_json["accessToken"] self._log("OTP Login successful, JWT obtained.") return raise PermissionDeniedError(message=resp.text) def _submit_no_addition_service(self, urn): url = "https://lift-api.vfsglobal.com/vas/mapvas" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "loginUser": self.config.account.username, "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "urn": urn, "applicants": [] } # C++ 只请求不检查结果,或者只要200就行 self._perform_request("POST", url, headers=headers, json_data=data) def _query_fee(self, apt_config, urn) -> Tuple[float, str]: url = "https://lift-api.vfsglobal.com/appointment/fees" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "languageCode": "en-US" } resp = self._perform_request("POST", url, headers=headers, json_data=data) j = resp.json() return j.get("totalamount"), j["feeDetails"][0].get("currency") def _schedule(self, apt_config, urn, amount, currency, slot_id) -> Dict: url = "https://lift-api.vfsglobal.com/appointment/schedule" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("mission_code"), "countryCode": self.free_config.get("country_code"), "centerCode": apt_config.get("vac_code"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "paymentdetails": { "paymentmode": "Online", "RequestRefNo": "", "clientId": "", "merchantId": "", "amount": amount, "currency": currency }, "allocationId": str(slot_id), "CanVFSReachoutToApplicant": True } resp = self._perform_request("POST", url, headers=headers, json_data=data) return resp.json() def _pay_request(self, payload) -> str: # C++: 检查 301/302 Redirect Location url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}" headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "referer": "https://visa.vfsglobal.com/", "user-agent": self.user_agent } # allow_redirects=False 以捕获 Location header resp = self.session.get(url, headers=headers, allow_redirects=False) if resp.status_code in [301, 302]: loc = resp.headers.get("Location", "") if loc.startswith("http"): return loc else: return "https://online.vfsglobal.com" + loc if loc.startswith("/") else "https://online.vfsglobal.com/" + loc else: raise NotFoundError(message='payment link not found') def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: """ 根据用户的期望范围筛选可用日期 :param dates: API 返回的可用日期列表 (YYYY-MM-DD) :param start_str: 用户期望开始日期 (YYYY-MM-DD) :param end_str: 用户期望结束日期 (YYYY-MM-DD) :return: 符合要求的日期列表 """ # 如果没有设置范围,则不过滤,返回所有日期 if not start_str or not end_str: return dates valid_dates = [] # 截取前10位以防带有时分秒 s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") # 比较范围 (闭区间) if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates def _save_http_session(self, page_url): """ 提取 cookies, local_storage, 存入 VSCloudApi """ cookies_dict = {} # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict() if hasattr(self.session.cookies, "get_dict"): cookies_dict = self.session.cookies.get_dict() else: # 方式 2: 迭代 (兼容标准 CookieJar) for c in self.session.cookies: cookies_dict[c.name] = c.value cookies_str = json.dumps(cookies_dict) # 简单生成 SessionID hash ua_str = self.user_agent or "unknown_ua" raw = cookies_str + ua_str + page_url session_id = hashes.Hash(hashes.SHA256(), backend=default_backend()) session_id.update(raw.encode()) sid = session_id.finalize().hex() proxy_str = "" if self.config.proxy.ip: proxy_str = f"{self.config.proxy.scheme}://" if self.config.proxy.username: proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@" proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}" return VSCloudApi.Instance().create_http_session( sid, cookies_str, "", ua_str, proxy_str, page_url )