# plugins/vfs_global_plugin.py import time import json import random import base64 import re import urllib.parse from datetime import datetime from typing import Dict, Any, Optional, List, Tuple # 使用 curl_cffi 模拟浏览器 TLS 指纹,这是 VFS 必须的 try: from curl_cffi import requests except ImportError: raise ImportError("Please install curl-cffi: pip install curl-cffi") # 加密库 from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend from vs_plg import IVSPlg, VSError # type: ignore from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, QueryWaitMode # type: ignore from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore from toolkit.vs_cloud_api import VSCloudApi # type: ignore from toolkit.rule_engine import RuleEngine # type: ignore # ----------------- 静态常量与辅助数据 ----------------- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2 GQIDAQAB -----END PUBLIC KEY-----""" COUNTRY_MAP = { "china": "CHN", "france": "FRA", "germany": "DEU", "italy": "ITA", "united kingdom": "GBR", "united states": "USA", "india": "IND", "russia": "RUS", "turkey": "TUR", "vietnam": "VNM" } def get_country_iso3(name: str) -> str: return COUNTRY_MAP.get(name.lower(), "CHN") def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str: """ 将邮箱域名替换为指定域名(默认 gmail-app.com) """ if "@" not in email: raise ValueError(f"Invalid email: {email}") local_part, _ = email.rsplit("@", 1) return f"{local_part}@{new_domain}" # ----------------- VfsPlugin 类 ----------------- class VfsPlugin(IVSPlg): def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.session = requests.Session() # 模拟 Chrome 124 self.session.impersonate = "chrome124" self.jwt_token = "" self.user_agent = "" self.last_error = VSError(0, "OK") self.real_ip = "" # 缓存配置 self.center_conf = None self.category_conf = {} self.subcategory_conf = {} # 加载公钥 self.public_key = serialization.load_pem_public_key( VFS_PUBLIC_KEY_PEM.encode(), backend=default_backend() ) # --- IVSPlg 接口实现 --- def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config try: self.free_config = json.loads(config.free_config) if config.free_config else {} except: self.free_config = {} # 设置代理 if config.proxy.ip: proxy_str = f"{config.proxy.scheme}://" if config.proxy.username: proxy_str += f"{config.proxy.username}:{config.proxy.password}@" proxy_str += f"{config.proxy.ip}:{config.proxy.port}" self.session.proxies = {"http": proxy_str, "https": proxy_str} VSC_DEBUG("vfs_plg", "[%s] Proxy set: %s", self.group_id, config.proxy.ip) def health_check(self) -> bool: return True def get_last_error(self) -> VSError: return self.last_error def create_session(self) -> bool: """登录流程""" VSC_INFO("vfs_plg", "[%s] Starting login...", self.group_id) # 1. Cloudflare Turnstile cf_token = self._handle_cloudflare_challenge() if not cf_token: return False # 2. 准备参数 email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) mission_code = self.free_config.get("missionCode", "") country_code = self.free_config.get("countryCode", "") client_src = self._get_client_source() orange_src = self._get_orange_source(email) url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src, "content-type": "application/x-www-form-urlencoded" }) data = { "username": email, "password": enc_password, "missioncode": mission_code, "countrycode": country_code, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": cf_token } # 3. 发送登录请求 (包含 OPTIONS) if not self._perform_request("POST", url, headers=headers, data=data): return False try: resp_json = self.session.last_response.json() if "accessToken" in resp_json and resp_json["accessToken"]: self.jwt_token = resp_json["accessToken"] VSC_INFO("vfs_plg", "[%s] Login successful, JWT obtained.", self.group_id) return True # OTP 处理 if resp_json.get("enableOTPAuthentication"): VSC_INFO("vfs_plg", "[%s] Login requires OTP.", self.group_id) otp = self._read_otp_email() if not otp: self._set_error(3001, "Failed to read Login OTP") return False return self._submit_login_otp(None, otp) self._set_error(1001, "Login failed: No access token or OTP flow.") return False except Exception as e: self._set_error(9001, f"Login parse error: {str(e)}") return False def query(self) -> VSQueryResult: """查询可预约 Slot""" result = VSQueryResult() apt_config = None target_tag = self.group_id appt_types = self.free_config.get("appointmentType", []) for apt in appt_types: if apt.get("tag") == target_tag or len(appt_types) == 1: apt_config = apt break if not apt_config: self._set_error(2001, "No matching appointment configuration found.") return result if not self._fetch_configurations(apt_config): return result earliest_date = [] if not self._query_earliest_slot(apt_config, earliest_date): return result if not earliest_date: return result date_str = earliest_date[0] result.success = True result.visa_type = apt_config.get("subcategoryCode", "") result.city = apt_config.get("vacCode", "") result.country = self.free_config.get("countryCode", "") if "WaitList" in date_str: result.availability_status = AvailabilityStatus.Waitlist result.earliest_date = "WaitList" VSC_INFO("vfs_plg", "[%s] Found WaitList.", self.group_id) else: result.availability_status = AvailabilityStatus.Available result.earliest_date = date_str VSC_INFO("vfs_plg", "[%s] Found Slot: %s", self.group_id, date_str) day_info = VSQueryResult.DateAvailability() day_info.date = date_str result.availability.append(day_info) return result def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]: """ 计算需要查询的月份列表,格式 YYYY-MM-DD (每月1号) """ fmt = "%Y-%m-%d" # 默认值处理 try: dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now() dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1) # from_date 格式可能是 DD/MM/YYYY (从 slot_info 来) try: dt_from = datetime.strptime(from_date, "%d/%m/%Y") except: dt_from = datetime.now() except: return [] # 归一化到月初 dt_start = dt_start.replace(day=1) dt_end = dt_end.replace(day=1) dt_from = dt_from.replace(day=1) # 起始点取 max(start, from) curr = max(dt_start, dt_from) months = [] while curr <= dt_end: months.append(curr.strftime(fmt)) # 下个月 if curr.month == 12: curr = curr.replace(year=curr.year + 1, month=1) else: curr = curr.replace(month=curr.month + 1) return months def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult: """ 执行完整的预约流程 (对应 C++ VFSApi::book) 包含:上传文档 -> 添加申请人 -> OTP -> 选时间 -> 锁定 -> 支付 """ user_email = user_inputs.get('email', 'get_visa_666@example.com') user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com") res = VSBookResult() # 定位 Appointment Config slot_routing_key = slot_info.routing_key # C++ 中 from_date 是入参,对应 Python 的 slot_info.earliest_date from_date = slot_info.earliest_date if slot_info.earliest_date else datetime.now().strftime("%d/%m/%Y") apt_config = None appt_types = self.free_config.get("appointmentType", []) for apt in appt_types: if apt.get("routingKey") == slot_routing_key or len(appt_types) == 1: apt_config = apt break if not apt_config: self._set_error(3001, "Book: Config missing.") return res # 刷新子配置 (获取 OCR/OTP 开关) if not self._fetch_configurations(apt_config): return res sub_cc = apt_config.get("subcategoryCode") sub_conf = self.subcategory_conf.get(sub_cc, {}) # ---------------- OCR 识别 / 文档上传 ---------------- # C++: bool ocr_enabled = ... ocr_enabled = sub_conf.get("isOCREnable", False) if ocr_enabled: VSC_INFO("vfs_plg", "[%s] OCR Enabled, uploading documents...", self.group_id) upload_res = {} if not self._upload_applicant_documents(apt_config, user_inputs, upload_res): return res # 回填上传结果到 user_inputs,供 add_primary_applicant 使用 user_inputs["applicant_image"] = upload_res.get("passportImageFilename") user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes") # Base64 user_inputs["guid"] = upload_res.get("uploadDocumentGUID") # ---------------- 需要提供申请号 (Cover Letter) ---------------- enable_reference_number = sub_conf.get("enableReferenceNumber", False) # ---------------- 添加申请人 (核心步骤 1) ---------------- urn = [] is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist) # C++: Retry loop for 422 Invalid Request add_primary_retry = 0 MAX_RETRY = 3 success_add = False while add_primary_retry < MAX_RETRY: urn = [] # 清空 if self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number, urn): success_add = True break # 检查是否是 422 错误 err = self.get_last_error() # 注意:需要在 _perform_request 或 _add_primary_applicant 中正确解析并设置 error_code 为 422 # 简单起见,如果 msg 包含 Invalid request 也算 if err.error_code == 422 or "Invalid request" in err.error_message: VSC_WARN("vfs_plg", "[%s] Add Applicant 422 error, retrying in 10s...", self.group_id) time.sleep(10) add_primary_retry += 1 else: # 其他错误直接退出 return res if not success_add: self._set_error(3002, "Failed to add primary applicant after retries") return res final_urn = urn[0] VSC_INFO("vfs_plg", "[%s] Applicant Added. URN: %s", self.group_id, final_urn) # ---------------- 申请人 OTP 验证 (核心步骤 2) ---------------- otp_enabled = sub_conf.get("isApplicantOTPEnabled", False) if otp_enabled: VSC_INFO("vfs_plg", "[%s] Applicant OTP Required.", self.group_id) if not self._applicant_otp_send(apt_config, final_urn): return res otp_code = self._read_otp_email() # 复用之前的读邮件逻辑 if not otp_code: self._set_error(3003, "Failed to read Applicant OTP from email") return res if not self._applicant_otp_verify(apt_config, final_urn, otp_code): return res # ---------------- 如果是 Waitlist 模式,直接确认并返回 ---------------- if is_waitlist: if self._confirm_waitlist(apt_config, final_urn): res.success = True res.urn = final_urn res.order_id = final_urn res.message = "Joined Waitlist" return res return res # ---------------- 规则引擎与日期筛选 (核心步骤 3) ---------------- # C++: RuleEngine rule_engine(rules); rules_str = user_inputs.get("rules", "") rule_engine = RuleEngine(rules_str) expected_start = user_inputs.get("expected_start_date", "") expected_end = user_inputs.get("expected_end_date", "") rule_engine.set_date_range_start(expected_start) rule_engine.set_date_range_end(expected_end) # 计算需要扫描的月份 # 如果 expected_start/end 为空,默认使用 from_date 所在月 months = self._get_filtered_covered_months(expected_start, expected_end, from_date) VSC_INFO("vfs_plg", "[%s] Scanning months: %s (From: %s)", self.group_id, months, from_date) selected_slot_id = "" selected_slot_date = "" selected_slot_time_range = "" all_ads = set() # 记录所有有号日期,避免重复处理 forbidden_dates = set() found_slot = False # 遍历月份寻找 Slot for m_str in months: # m_str format: YYYY-MM-DD (月初) # C++ 需要 DD/MM/YYYY try: dt_m = datetime.strptime(m_str, "%Y-%m-%d") converted_date = dt_m.strftime("%d/%m/%Y") except: continue ads = [] # Available Date Strings if not self._query_slot_calendar(apt_config, final_urn, converted_date, ads): time.sleep(3) continue if not ads: time.sleep(3) continue # 过滤已知的 slots new_ads = [d for d in ads if d not in all_ads] all_ads.update(new_ads) # 尝试 3 次选择 for _ in range(3): # 排除 forbidden avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates] # 规则筛选 sel_dates = rule_engine.select_date(avail_candidates, "%d/%m/%Y") if not sel_dates: break # 当前月份符合规则的都没了 tmp_date = sel_dates[0] # 取第一个符合规则的日期 forbidden_dates.add(tmp_date) # 标记为已尝试 # 审计日志 (C++ saveuseractionaudit) if not self._saveuseractionaudit(apt_config, final_urn, tmp_date): time.sleep(3) continue # 查询具体时间 (query_slot_time) ats = [] if not self._query_slot_time(apt_config, final_urn, tmp_date, ats): time.sleep(3) continue if not ats: continue # 随机选择一个时间段 sel_tm = random.choice(ats) selected_slot_id = sel_tm.get("allocationId") selected_slot_date = tmp_date selected_slot_time_range = sel_tm.get("slot") found_slot = True break if found_slot: break if not found_slot: self._set_error(3004, "No valid slots found after Rule Engine filtering.") return res VSC_INFO("vfs_plg", "[%s] Slot Selected: %s %s (ID: %s)", self.group_id, selected_slot_date, selected_slot_time_range, selected_slot_id) # ---------------- 服务、费用、最终预约 (核心步骤 4) ---------------- # 跳过附加服务 if not self._submit_no_addition_service(final_urn): return res # 查询费用 amount = 0.0 currency = "" amount, currency = self._query_fee(apt_config, final_urn) # 简单保留两位小数 amount = round(amount, 2) # 最终提交 (Schedule) schedule_res = {} # C++: schedule_with_retry (max 3) schedule_success = False for _ in range(3): if self._schedule(apt_config, final_urn, amount, currency, selected_slot_id, schedule_res): schedule_success = True break # 检查是否是被防火墙拦截 if self.get_last_error().error_code == 403: # 重新处理 CF self._handle_cloudflare_challenge() else: break # 其他错误不重试 if not schedule_success: return res # ---------------- 构造返回结果 ---------------- res.success = True res.book_date = selected_slot_date res.book_time = selected_slot_time_range res.urn = final_urn res.order_id = final_urn res.fee_amount = int(amount * 100) res.fee_currency = currency # 处理支付链接 is_pay_req = schedule_res.get("IsPaymentRequired", False) if is_pay_req: payload = schedule_res.get("payLoad", "") payment_url = self._pay_request(payload) if payment_url: res.payment_link = payment_url # 保存 Session (C++ save_http_session) saved_session = self._save_http_session(payment_url) if saved_session: res.session_id = saved_session['session_id'] else: res.message = "Booking confirmed (No payment required)" return res def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool: """ 确认加入候补名单 (对应 C++ VFSApi::confirm_waitlist) """ url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "CanVFSReachoutToApplicant": True } # 发送请求 if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: j = self.session.last_response.json() # 1. 检查 API 返回的 error 字段 if "error" in j and j["error"]: err_val = j["error"] # 兼容 error 为字符串或对象的情况 if isinstance(err_val, str) and err_val: self._set_error(2006, f"Confirm Waitlist API Error: {err_val}") return False elif isinstance(err_val, dict) or isinstance(err_val, list): self._set_error(2006, f"Confirm Waitlist API Error: {err_val}") return False # 2. 检查 isConfirmed 字段 if j.get("isConfirmed") is True: VSC_INFO("vfs_plg", "[%s] Waitlist confirmed successfully for URN: %s", self.group_id, urn) return True self._set_error(2007, f"Confirm Waitlist failed, response: {str(j)[:100]}") except Exception as e: self._set_error(9001, f"Confirm Waitlist parse error: {str(e)}") return False def _upload_applicant_documents(self, apt_config, user_inputs, res_out: Dict) -> bool: """上传护照图片""" url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument" passport_url = user_inputs.get("passport_image_url") if not passport_url: self._set_error(9007, "Missing passport_image_url") return False # 下载图片转 Base64 (C++ download_img_encode_base64) try: # 简单的下载,这里不使用 session,直接下载静态资源 img_resp = requests.get(passport_url, timeout=30) if img_resp.status_code != 200: self._set_error(9008, "Failed to download passport image") return False b64_str = base64.b64encode(img_resp.content).decode('utf-8') except Exception as e: self._set_error(9008, f"Image download exception: {e}") return False headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missioncode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "languageCode": "en-US", "visaCategoryCode": apt_config.get("subcategoryCode"), "fileBytes": b64_str, "selfiImageFileBytes": "" # 暂不支持自拍 } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: j = self.session.last_response.json() if "error" in j and j["error"]: self._set_error(2005, f"Upload error: {j}") return False res_out.update(j) # 补全 C++ 逻辑中的模拟文件名 res_out["passportImageFilename"] = "passport_img.jpg" res_out["passportImageFileBytes"] = b64_str return True except: return False def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any], is_waitlist: bool, ocr_enabled: bool, enable_ref: bool, urn_out_list: List[str]) -> bool: """ 添加主申请人 (对应 C++ VFSApi::add_primary_applicant) 构造复杂的申请人 JSON payload 并提交 """ url = "https://lift-api.vfsglobal.com/appointment/applicants" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" # --- 辅助 Helper: 映射性别 --- # C++: male/Male -> 1, 否则 -> 2 gender_str = str(user_inputs.get("gender", "")).lower() gender_code = 1 if gender_str == "male" else 2 # --- 辅助 Helper: 获取 Dial Code --- # C++ 逻辑处理了 int 和 string raw_dial = user_inputs.get("phone_country_code", "86") dial_code = str(raw_dial) # --- 辅助 Helper: 格式化日期 --- # C++: to_ddmmyyyy (YYYY-MM-DD -> DD/MM/YYYY) def _to_ddmmyyyy(d_str): try: # 假设输入是 YYYY-MM-DD return datetime.strptime(d_str, "%Y-%m-%d").strftime("%d/%m/%Y") except: return d_str # 原样返回 dob = _to_ddmmyyyy(str(user_inputs.get("birthday", ""))) ppt_exp = _to_ddmmyyyy(str(user_inputs.get("passport_expiry_date", ""))) # --- 构造单个 Applicant 对象 --- # 对应 C++ 中庞大的 applicant JSON 构建 applicant = { "urn": "", "arn": "", "loginUser": self.config.account.username, # 基本信息 (全部大写) "firstName": str(user_inputs.get("first_name", "")).upper(), "middleName": "", "lastName": str(user_inputs.get("last_name", "")).upper(), "employerFirstName": "", "employerLastName": "", "salutation": "", "gender": gender_code, # 联系信息 "contactNumber": str(user_inputs.get("phone", "")), "dialCode": dial_code, "employerContactNumber": "", "employerDialCode": "", "emailId": str(user_inputs.get("alias_email", "")).upper(), "employerEmailId": "", # 证件信息 "passportNumber": str(user_inputs.get("passport_no", "")).upper(), "confirmPassportNumber": "", # 通常留空 "passportExpirtyDate": ppt_exp, # 注意拼写 Expirty 是 VFS API 的特征 "dateOfBirth": dob, "nationalId": None, # 国籍 (使用全局辅助函数 get_country_iso3) "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))), # 地址与其它 (大部分为空) "state": None, "city": None, "addressline1": None, "addressline2": None, "pincode": None, "isEndorsedChild": False, "applicantType": 0, "vlnNumber": None, "applicantGroupId": 0, "parentPassportNumber": "", "parentPassportExpiry": "", "dateOfDeparture": None, "entryType": "", "eoiVisaType": "", "passportType": "", "vfsReferenceNumber": "", "familyReunificationCerificateNumber": "", "PVRequestRefNumber": "", "PVStatus": "", "PVStatusDescription": "", "PVCanAllowRetry": True, "PVisVerified": False, "eefRegistrationNumber": "", "isAutoRefresh": True, "helloVerifyNumber": "", "OfflineCClink": "", "idenfystatuscheck": False, "vafStatus": None, "SpecialAssistance": "", "AdditionalRefNo": None, "juridictionCode": "", "canInitiateVAF": False, "canEditVAF": False, "canDeleteVAF": False, "canDownloadVAF": False, "Retryleft": "", # 真实 IP 注入 "ipAddress": self.real_ip or "127.0.0.1" } # --- 处理 Reference Number (Cover Letter) --- if enable_ref: applicant["referenceNumber"] = str(user_inputs.get("cover_letter", "")) else: applicant["referenceNumber"] = None # --- 处理 OCR 数据 --- if ocr_enabled: # 必须从 user_inputs 获取上传后返回的 metadata applicant["applicantImage"] = str(user_inputs.get("applicant_image", "")) applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", "")) applicant["GUID"] = str(user_inputs.get("guid", "")) # --- 构造最外层 Payload --- payload = { "countryCode": self.free_config.get("countryCode"), "missionCode": self.free_config.get("missionCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategoryCode"), "applicantList": [applicant], # 数组形式 "languageCode": "en-US", "isWaitlist": is_waitlist, "isEdit": False, "feeEntryTypeCode": None, "feeExemptionTypeCode": None, "feeExemptionDetailsCode": None, "juridictionCode": None, "regionCode": None } # --- 发送请求 --- if not self._perform_request("POST", url, headers=headers, json_data=payload): return False # --- 处理响应 --- try: j = self.session.last_response.json() # 1. 成功情况:返回了 urn if "urn" in j and j["urn"]: urn_out_list.append(j["urn"]) return True # 2. 错误处理:检查是否为 422 Invalid Request (反爬/校验失败) # C++ 逻辑依赖于捕获这个特定的错误码来进行重试 if "error" in j and j["error"]: err_data = j["error"] code = 0 desc = "" if isinstance(err_data, dict): code = err_data.get("code", 0) desc = err_data.get("description", "") or err_data.get("message", "") # 设置到 last_error 以供上层重试逻辑检查 if code == 422 or "Invalid request" in desc: self._set_error(422, f"Add Applicant 422: {desc}") else: self._set_error(3005, f"Add Applicant API Error: {desc}") return False except Exception as e: self._set_error(9001, f"Add Applicant parse error: {str(e)}") return False def _applicant_otp_send(self, apt_config, urn) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "OTP": "", "otpAction": "GENERATE", "languageCode": "en-US" } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: if self.session.last_response.json().get("isOTPGenerated"): return True except: pass return False def _applicant_otp_verify(self, apt_config, urn, otp) -> bool: url = "https://lift-api.vfsglobal.com/appointment/applicantotp" headers = self._get_common_headers(with_auth=True) # C++ specific: datacenter header headers["datacenter"] = "GERMANY" headers["content-type"] = "application/json;charset=UTF-8" data = { "urn": urn, "loginUser": self.config.account.username, "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "OTP": otp, "otpAction": "VALIDATE", "languageCode": "en-US" } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: if self.session.last_response.json().get("isOTPValidated"): return True except: pass return False def _query_slot_calendar(self, apt_config, urn, from_date, ads_out: List) -> bool: url = "https://lift-api.vfsglobal.com/appointment/calendar" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategoryCode"), "fromDate": from_date, "urn": urn, "payCode": "" } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: j = self.session.last_response.json() calendars = j.get("calendars") if calendars: ads_out.clear() for item in calendars: # item["date"] is usually "MM/DD/YYYY" or "YYYY-MM-DD" depending on API version # C++ assumes "MM/DD/YYYY" -> "DD/MM/YYYY" raw = item.get("date") try: # Normalize to DD/MM/YYYY dObj = datetime.strptime(raw, "%m/%d/%Y") ads_out.append(dObj.strftime("%d/%m/%Y")) except: ads_out.append(raw) return True except: pass return False def _query_slot_time(self, apt_config, urn, slot_date, ats_out: List) -> bool: url = "https://lift-api.vfsglobal.com/appointment/timeslot" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "visaCategoryCode": apt_config.get("subcategoryCode"), "slotDate": slot_date, "urn": urn } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: j = self.session.last_response.json() slots = j.get("slots") if slots: ats_out.extend(slots) return True except: pass return False def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool: url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" # ISO format conversion try: dt = datetime.strptime(earliest_date, "%d/%m/%Y") iso_date = dt.strftime("%Y-%m-%dT%H:%M:%S") except: iso_date = earliest_date data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "urn": urn, "firstEarliestSlotDate": earliest_date, "action": "schedule", "ipAddress": self.real_ip or "127.0.0.1", "eadAppointmentDetail": iso_date } if not self._perform_request("POST", url, headers=headers, json_data=data): return False return self.session.last_response.json().get("isSavedSuccess", False) # ----------------- 内部功能函数 ----------------- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None) -> bool: """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ print(f'[perform request] {method} {url}') # --- 1. 发送 OPTIONS 请求 --- try: # OPTIONS 请求使用相同的 URL 和 headers (部分 header 如 content-length 会被自动处理) # 某些服务器反爬会检测 OPTIONS 请求 opt_headers = headers.copy() if headers else {} # 发送 OPTIONS self.session.request("OPTIONS", url, headers=opt_headers, timeout=10) # C++ 代码中并不检查 OPTIONS 的返回值,只检查执行是否成功 # 这里我们假设只要不抛出异常即可 except Exception as e: # 记录警告但不中断流程,防止仅仅是 OPTIONS 失败导致误判 VSC_DEBUG("vfs_plg", "OPTIONS request failed (non-fatal): %s", str(e)) # --- 2. 发送实际请求 --- try: resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) self.session.last_response = resp if resp.status_code == 200: return True elif resp.status_code == 401: self._set_error(401, "Session Invalid (401)") elif resp.status_code == 403: self._set_error(403, "Blocked by Firewall (403)") elif resp.status_code == 429: self._set_error(429, "Rate Limited (429)") else: self._set_error(resp.status_code, f"HTTP Error {resp.status_code}: {resp.text[:100]}") return False except Exception as e: self._set_error(9000, f"Network Exception: {str(e)}") return False def _encrypt_password(self, password: str) -> str: ciphertext = self.public_key.encrypt( password.encode(), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return base64.b64encode(ciphertext).decode() def _get_orange_source(self, email: str) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"{email};{timestamp}" return self._encrypt_password(payload) def _get_client_source(self) -> str: timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") payload = f"GA;{timestamp}Z" return self._encrypt_password(payload) def _handle_cloudflare_challenge(self) -> str: """ 完整实现的 Cloudflare Turnstile 验证逻辑 对应 C++ VFSApi::handle_cloudflare_challenge """ # 1. 准备参数 mission = self.free_config.get("missionCode", "") country = self.free_config.get("countryCode", "") if not mission or not country: self._set_error(9001, "Missing missionCode or countryCode in free_config") return "" website_url = f"https://visa.vfsglobal.com/{country}/en/{mission}/login" # 构造代理字符串传给打码平台 (格式: http://user:pass@ip:port) proxy_str = "" if self.config.proxy.ip: proxy_str = f"{self.config.proxy.scheme}://" if self.config.proxy.username: proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@" proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}" # 2. 提交任务 VSC_INFO("vfs_plg", "[%s] Submitting Turnstile task for %s...", self.group_id, website_url) task_out = VSCloudApi.Instance().submit_anti_turnstile_task(proxy_str, website_url) if not task_out: self._set_error(9002, "Failed to submit captcha task to Cloud API") return "" task_id = str(task_out.get("id")) if not task_id: self._set_error(9002, "Cloud API returned invalid task ID") return "" # 3. 轮询结果 (超时时间 120秒) timeout = 120 start_time = time.time() while time.time() - start_time < timeout: result_out = VSCloudApi.Instance().get_anti_turnstile_result(task_id) if not result_out: # 获取结果的网络请求失败,稍后重试 time.sleep(3) continue # status: 0=Pending, 1=Processing, 2=Success, 3=Failed status = result_out.get("status", 0) if status == 2: # Success raw_result = result_out.get("result", "") try: # 4. 解析结果 JSON # 打码平台返回的 result 通常是一个 JSON 字符串,包含 token, userAgent, cookies if isinstance(raw_result, str): data = json.loads(raw_result) else: data = raw_result # 已经是 dict token = data.get("token") ua = data.get("userAgent") cookies_list = data.get("cookies", []) if not token: self._set_error(9004, "Captcha solved but token is empty") return "" # 5. 同步环境 (User-Agent 和 Cookies) # 这是最关键的一步,必须使用通过验证时的环境进行后续请求 # A. 设置 User-Agent if ua: self.user_agent = ua self.session.headers["User-Agent"] = ua # 注意:curl_cffi 的 impersonate 可能会覆盖 header, # 如果打码平台返回的 UA 看起来像 Chrome 124,通常兼容性没问题。 # 如果非常严格,可能需要根据返回的 UA 调整 impersonate 参数,但在 VFS 场景下 # 只要 header 对了通常通过率就很高。 # B. 设置 Cookies # 这里的 cookies 是 Turnstile 验证过程中生成的 (如 cf_clearance) if cookies_list: VSC_DEBUG("vfs_plg", "[%s] Syncing %d cookies from Captcha solver...", self.group_id, len(cookies_list)) for cookie in cookies_list: # 兼容不同的 cookie 格式 c_name = cookie.get("name") c_value = cookie.get("value") c_domain = cookie.get("domain", "") c_path = cookie.get("path", "/") if c_name and c_value: self.session.cookies.set( name=c_name, value=c_value, domain=c_domain, path=c_path ) VSC_INFO("vfs_plg", "[%s] Cloudflare challenge passed.", self.group_id) return token except Exception as e: self._set_error(9005, f"Failed to parse captcha result JSON: {str(e)}") return "" elif status == 3: # Failed err_msg = result_out.get("result", "Unknown error") self._set_error(9003, f"Captcha task failed: {err_msg}") return "" else: # Pending / Processing time.sleep(3) self._set_error(9003, "Captcha task timeout (120s)") return "" def _get_common_headers(self, with_auth=True) -> Dict[str, str]: mission = self.free_config.get("missionCode", "") country = self.free_config.get("countryCode", "") lang = self.free_config.get("language", "en") route = f"{country}/{lang}/{mission}" h = { "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9", "origin": "https://visa.vfsglobal.com", "referer": "https://visa.vfsglobal.com/", "route": route } client_src = self._get_client_source() h["clientsource"] = client_src if with_auth and self.jwt_token: h["authorize"] = self.jwt_token return h def _query_earliest_slot(self, apt_config, earliest_date_out: List[str]) -> bool: """ 查询最早 Slot (对应 C++ VFSApi::query_earliest_slot_with_retry) 增加了 403 被拦截时的 Cloudflare 自动绕过机制 """ url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable" data = { "missioncode": self.free_config.get("missionCode"), "countrycode": self.free_config.get("countryCode"), "vacCode": apt_config.get("vacCode"), "visaCategoryCode": apt_config.get("subcategoryCode"), "roleName": "Individual", "loginUser": self.config.account.username, "payCode": "" } max_retries = 3 for attempt in range(max_retries): # 每次重试前重新获取 header,因为 handle_cloudflare_challenge 可能会更新 Token/UA headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" # 发送请求 if self._perform_request("POST", url, headers=headers, json_data=data): # --- 请求成功 (HTTP 200) --- resp_text = self.session.last_response.text # 1. 检查是否是 WaitList if "WaitList" in resp_text: earliest_date_out.append("WaitList") return True # 2. 解析日期 try: j = self.session.last_response.json() if j.get("earliestSlotLists"): raw_date = j["earliestSlotLists"][0]["date"] # raw_date 示例: "09/10/2025 00:00:00" (表示 2025年9月10日) try: # 1. 按 MM/DD/YYYY HH:MM:SS 解析 dt = datetime.strptime(raw_date, "%m/%d/%Y %H:%M:%S") # 2. 转为 DD/MM/YYYY std_date = dt.strftime("%d/%m/%Y") earliest_date_out.append(std_date) return True except ValueError: # 备用:万一格式变了 (比如变成了 YYYY-MM-DD),尝试其他解析或原样返回 # 这里记录警告,防止 silently fail VSC_WARN("vfs_plg", "[%s] Date parse warning: '%s' not matching %%m/%%d/%%Y", self.group_id, raw_date) # 尝试直接分割,虽然可能格式不对,但总比崩溃好 earliest_date_out.append(raw_date.split(" ")[0]) return True except Exception as e: VSC_DEBUG("vfs_plg", f"Parse earliest slot error: {e}") pass # 虽然 HTTP 200 但没有 slot 数据 self._set_error(2002, "No slots found in response") return False else: # --- 请求失败 (HTTP != 200) --- err = self.get_last_error() # 关键逻辑:如果被防火墙拦截 (403),尝试过盾 if err.error_code == 403: VSC_WARN("vfs_plg", "[%s] Query Blocked (403) - Attempt %d/%d. Solving Cloudflare...", self.group_id, attempt + 1, max_retries) # 调用过盾逻辑 cf_token = self._handle_cloudflare_challenge() if cf_token: # 过盾成功,随机冷却 1-3 秒后重试 time.sleep(random.uniform(1, 3)) continue else: # 过盾失败,无法继续 self._set_error(403, "Cloudflare challenge failed during query retry") return False elif err.error_code == 401: # Session 失效,通常需要重新登录,这里不重试,直接返回失败让上层处理 return False else: # 其他错误 (500, 404 等),不立即重试 return False # 超过最大重试次数 self._set_error(403, "Query blocked by firewall after max retries") return False def _set_error(self, code, msg): self.last_error = VSError(code, msg) VSC_ERROR("vfs_plg", "[%s] Error %d: %s", self.group_id, code, msg) def _fmt_date(self, yyyy_mm_dd): try: return datetime.strptime(yyyy_mm_dd, "%Y-%m-%d").strftime("%d/%m/%Y") except: return yyyy_mm_dd def _fetch_configurations(self, apt_config: Dict[str, Any]) -> bool: """ 获取并缓存签证中心、类别、子类别配置 对应 C++ VFSApi::fetch_configurations """ # 1. 获取所有中心配置 (query_center) if not self.center_conf: centers = [] if not self._query_center(centers): return False self.center_conf = centers # 2. 获取 Visa Category 配置 vac_code = apt_config.get("vacCode") category_code = apt_config.get("categoryCode") # 检查是否已缓存该 VAC 的 category 配置 # C++ 逻辑是: _category_configuration[category_code] = vc # 但这里逻辑似乎是按 category_code 索引。为了保险,我们按 C++ 逻辑实现。 # 注意:C++ map key 是 vac_code 还是 category_code? # C++ 代码中: if (!_category_configuration.contains(vac_code)...) # 但存进去是用 category_code 作为 key: _category_configuration[category_code] = vc; # 这在 Python 中有点奇怪,我们这里使用字典:self.category_conf[category_code] = config # 为了避免重复查询,我们需要知道是否已经查询过这个 VAC。 # 简单起见,检查目标 category_code 是否已在缓存中 if category_code not in self.category_conf: visa_categories = [] if not self._query_visa_category(vac_code, visa_categories): return False found = False for vc in visa_categories: if vc.get("code") == category_code: self.category_conf[category_code] = vc found = True break # 如果没找到,可能配置错误,但 C++ 没报错,只继续 if not found: VSC_WARN("vfs_plg", "[%s] Category code '%s' not found in VAC '%s'", self.group_id, category_code, vac_code) # 3. 获取 Visa SubCategory 配置 sub_category_code = apt_config.get("subcategoryCode") if sub_category_code not in self.subcategory_conf: visa_subcategories = [] if not self._query_visa_sub_category(vac_code, category_code, visa_subcategories): return False found = False for svc in visa_subcategories: if svc.get("code") == sub_category_code: self.subcategory_conf[sub_category_code] = svc found = True break if not found: VSC_WARN("vfs_plg", "[%s] SubCategory code '%s' not found", self.group_id, sub_category_code) return True def _query_center(self, centers_out: List) -> bool: """对应 C++ VFSApi::query_center""" mission = self.free_config.get("missionCode") country = self.free_config.get("countryCode") url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US" headers = self._get_common_headers(with_auth=False) # 通常 Master API 不需要 Auth 或者是独立的 # 实际上根据 C++ 代码,这里使用的是 _get_client_source 生成的 headers,且不需要 authorize token # 但 C++ 代码中也没有明确加 authorize header,除非 _jwt_token 不为空 # 保险起见,如果有了 Token 就带上,没有就不带 (get_common_headers 默认逻辑) if not self._perform_request("GET", url, headers=headers): return False try: j = self.session.last_response.json() if isinstance(j, list): centers_out.extend(j) return True else: self._set_error(2003, "query_center response is not a list") except Exception as e: self._set_error(9001, f"query_center parse error: {e}") return False def _query_visa_category(self, center_code: str, visa_category_out: List) -> bool: """对应 C++ VFSApi::query_visa_category""" mission = self.free_config.get("missionCode") country = self.free_config.get("countryCode") # URL Encode enc_center = urllib.parse.quote(center_code) url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US" headers = self._get_common_headers(with_auth=False) if not self._perform_request("GET", url, headers=headers): return False try: j = self.session.last_response.json() # C++ 增加了错误检查 if isinstance(j, list): if j and "error" in j[0] and j[0]["error"]: self._set_error(2004, f"API Error in query_visa_category: {j[0]}") return False visa_category_out.extend(j) return True else: self._set_error(2003, "query_visa_category response is not a list") except Exception as e: self._set_error(9001, f"query_visa_category parse error: {e}") return False def _query_visa_sub_category(self, center_code: str, category_code: str, visa_sub_category_out: List) -> bool: """对应 C++ VFSApi::query_visa_sub_category""" mission = self.free_config.get("missionCode") country = self.free_config.get("countryCode") enc_center = urllib.parse.quote(center_code) enc_cat = urllib.parse.quote(category_code) url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US" headers = self._get_common_headers(with_auth=False) if not self._perform_request("GET", url, headers=headers): return False try: j = self.session.last_response.json() if isinstance(j, list): if j and "error" in j[0] and j[0]["error"]: self._set_error(2004, f"API Error in query_visa_sub_category: {j[0]}") return False visa_sub_category_out.extend(j) return True else: self._set_error(2003, "query_visa_sub_category response is not a list") except Exception as e: self._set_error(9001, f"query_visa_sub_category parse error: {e}") return False def _read_otp_email(self) -> str: """ 读取 OTP 邮件 (对应 C++ VFSApi::read_otp_code) """ # 1. 定义 C++ 代码中的常量 master_email = "visafly666@gmail.com" recipient = self.config.account.username # 注意:C++ 代码中 sender 是 "donotreply at vfshelpline.com" sender = "donotreply at vfshelpline.com" subject_keywords = "One Time Password" body_keywords = "OTP" # 2. 获取当前 UTC 时间并格式化 (对应 C++ std::put_time "%Y-%m-%d %H:%M:%S") # 用于告诉云端只查询这个时间点之后收到的邮件 now_utc = datetime.utcnow() formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S") VSC_INFO("vfs_plg", "[%s] Waiting for OTP email sent after %s...", self.group_id, formatted_utc_time) # 3. 轮询查收 (C++ 逻辑通常由外部调度,Python 插件内部实现轮询更稳健) # 尝试 12 次,每次间隔 5 秒,共 60 秒 for i in range(12): content_out = [] # 用于接收结果的容器 # 对应 C++: expiry = 5 * 60 (300秒) content_out = VSCloudApi.Instance().fetch_mail_content( master_email, sender, recipient, subject_keywords, body_keywords, formatted_utc_time, 300 ) if content_out: # 4. 正则匹配 6位数字 (对应 C++ std::regex otp_pattern(R"(\b\d{6}\b)")) match = re.search(r'\b\d{6}\b', content_out) if match: otp = match.group(0) VSC_INFO("vfs_plg", "[%s] OTP code found: %s", self.group_id, otp) return otp # 未找到,等待重试 time.sleep(5) if i % 2 == 0: VSC_DEBUG("vfs_plg", "[%s] OTP not found yet, retrying...", self.group_id) # 5. 超时处理 self._set_error(4004, "OTP email not found (timeout)") return "" def _submit_login_otp(self, cf_token: str, otp: str) -> bool: """ 提交 OTP 验证码进行登录 (对应 C++ VFSApi::submit_otp_code) """ VSC_INFO("vfs_plg", "[%s] Submitting Login OTP...", self.group_id) # 1. 准备基础数据 email = self.config.account.username password = self.config.account.password enc_password = self._encrypt_password(password) mission_code = self.free_config.get("missionCode", "") country_code = self.free_config.get("countryCode", "") # 2. 生成加密 Source (每次请求时间戳不同,建议重新生成) client_src = self._get_client_source() orange_src = self._get_orange_source(email) # 3. 构造请求 url = "https://lift-api.vfsglobal.com/user/login" headers = self._get_common_headers(with_auth=False) headers.update({ "clientsource": client_src, "orangex": orange_src, "content-type": "application/x-www-form-urlencoded" }) # 注意:C++ 代码中在此处会再次调用 handle_cloudflare_challenge 获取新 token。 # 如果传入的 cf_token 已经在上一步(密码登录)中被消耗,这里可能需要重新获取。 # 为了稳健,如果传入为空,尝试重新获取。 if not cf_token: VSC_DEBUG("vfs_plg", "[%s] CF Token is empty, regenerating for OTP...", self.group_id) cf_token = self._handle_cloudflare_challenge() if not cf_token: return False data = { "username": email, "password": enc_password, "missioncode": mission_code, "countrycode": country_code, "languageCode": "en-US", "captcha_version": "cloudflare-v1", "captcha_api_key": cf_token, "otp": otp # 关键字段:OTP 验证码 } # 4. 发送请求 (POST form-urlencoded) if not self._perform_request("POST", url, headers=headers, data=data): return False # 5. 处理响应 try: resp_json = self.session.last_response.json() # A. 检查错误 if "error" in resp_json and resp_json["error"]: err_obj = resp_json["error"] # 兼容不同的错误结构 (message 或 description) msg = "" if isinstance(err_obj, dict): msg = err_obj.get("message") or err_obj.get("description") or "Unknown error" else: msg = str(err_obj) if "locked" in str(msg).lower(): self._set_error(4005, f"Account Locked during OTP: {msg}") else: self._set_error(4006, f"OTP Login Error: {msg}") return False # B. 检查 AccessToken (登录成功) if "accessToken" in resp_json and resp_json["accessToken"]: self.jwt_token = resp_json["accessToken"] VSC_INFO("vfs_plg", "[%s] OTP Login successful, JWT obtained.", self.group_id) return True self._set_error(9006, f"OTP Login failed, unknown response: {str(resp_json)[:100]}") return False except Exception as e: self._set_error(9001, f"OTP Login parse error: {str(e)}") return False def _submit_no_addition_service(self, urn) -> bool: url = "https://lift-api.vfsglobal.com/vas/mapvas" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "loginUser": self.config.account.username, "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "urn": urn, "applicants": [] } # C++ 只请求不检查结果,或者只要200就行 return self._perform_request("POST", url, headers=headers, json_data=data) def _query_fee(self, apt_config, urn) -> Tuple[float, str]: url = "https://lift-api.vfsglobal.com/appointment/fees" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "urn": urn, "languageCode": "en-US" } if self._perform_request("POST", url, headers=headers, json_data=data): try: j = self.session.last_response.json() amt = j.get("totalamount", 0.0) curr = "" if j.get("feeDetails"): curr = j["feeDetails"][0].get("currency", "") return float(amt), curr except: pass return 0.0, "" def _schedule(self, apt_config, urn, amount, currency, slot_id, res_out: Dict) -> bool: url = "https://lift-api.vfsglobal.com/appointment/schedule" headers = self._get_common_headers(with_auth=True) headers["content-type"] = "application/json;charset=UTF-8" data = { "missionCode": self.free_config.get("missionCode"), "countryCode": self.free_config.get("countryCode"), "centerCode": apt_config.get("vacCode"), "loginUser": self.config.account.username, "urn": urn, "notificationType": "none", "paymentdetails": { "paymentmode": "Online", "RequestRefNo": "", "clientId": "", "merchantId": "", "amount": amount, "currency": currency }, "allocationId": slot_id, "CanVFSReachoutToApplicant": True } if not self._perform_request("POST", url, headers=headers, json_data=data): return False try: j = self.session.last_response.json() if j.get("IsAppointmentBooked"): res_out.update(j) return True except: pass return False def _pay_request(self, payload) -> str: # C++: 检查 301/302 Redirect Location url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}" headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "referer": "https://visa.vfsglobal.com/", "user-agent": self.user_agent } try: # allow_redirects=False 以捕获 Location header resp = self.session.get(url, headers=headers, allow_redirects=False) if resp.status_code in [301, 302]: loc = resp.headers.get("Location", "") if loc.startswith("http"): return loc else: return "https://online.vfsglobal.com" + loc if loc.startswith("/") else "https://online.vfsglobal.com/" + loc except: pass return "" def _save_http_session(self, page_url): """ 提取 cookies, local_storage, 存入 VSCloudApi 修复:curl_cffi 没有 requests.utils.dict_from_cookiejar,需手动提取 """ cookies_dict = {} try: # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict() if hasattr(self.session.cookies, "get_dict"): cookies_dict = self.session.cookies.get_dict() else: # 方式 2: 迭代 (兼容标准 CookieJar) for c in self.session.cookies: cookies_dict[c.name] = c.value except Exception as e: VSC_WARN("vfs_plg", "[%s] Failed to extract cookies: %s", self.group_id, str(e)) cookies_str = json.dumps(cookies_dict) # 简单生成 SessionID hash ua_str = self.user_agent or "unknown_ua" raw = cookies_str + ua_str + page_url try: session_id = hashes.Hash(hashes.SHA256(), backend=default_backend()) session_id.update(raw.encode()) sid = session_id.finalize().hex() proxy_str = "" if self.config.proxy.ip: proxy_str = f"{self.config.proxy.scheme}://" if self.config.proxy.username: proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@" proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}" saved_session = VSCloudApi.Instance().create_http_session( sid, cookies_str, "", ua_str, proxy_str, page_url ) if saved_session: VSC_INFO("vfs_plg", "[%s] Session saved. ID: %s", self.group_id, sid) else: VSC_ERROR("vfs_plg", "[%s] Session save failed. ID: %s", self.group_id, sid) return saved_session except Exception as e: # 捕获异常,确保即使保存 Session 失败,也不影响主流程返回预订结果 VSC_WARN("vfs_plg", "[%s] Failed to save session to cloud: %s", self.group_id, str(e)) return None