| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256 |
- # plugins/vfs_global_plugin.py
- import time
- import json
- import random
- import base64
- import re
- import urllib.parse
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple
- from curl_cffi import requests, const
- # 加密库
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN
- from toolkit.vs_cloud_api import VSCloudApi
- # ----------------- 静态常量与辅助数据 -----------------
- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc
- LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs
- t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y
- t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6
- 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0
- 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2
- GQIDAQAB
- -----END PUBLIC KEY-----"""
- COUNTRY_MAP = {
- "china": "CHN", "france": "FRA", "germany": "DEU", "italy": "ITA",
- "united kingdom": "GBR", "united states": "USA", "india": "IND",
- "russia": "RUS", "turkey": "TUR", "vietnam": "VNM"
- }
- def get_country_iso3(name: str) -> str:
- return COUNTRY_MAP.get(name.lower(), "CHN")
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- """
- 将邮箱域名替换为指定域名(默认 gmail-app.com)
- """
- if "@" not in email:
- raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- # 转换日期到YYYY-MM-DD 固定格式
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime("%Y-%m-%d")
- class VfsPlugin(IVSPlg):
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.session: Optional[requests.Session] = None
-
- self.jwt_token = ""
- self.user_agent = ""
- self.real_ip = ""
- self.is_healthy = True
- # 缓存配置
- self.center_conf = None
- self.category_conf = {}
- self.subcategory_conf = {}
-
- # 加载公钥
- self.public_key = serialization.load_pem_public_key(
- VFS_PUBLIC_KEY_PEM.encode(),
- backend=default_backend()
- )
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
- def health_check(self) -> bool:
- return self.is_healthy
- def create_session(self) -> None:
- # 初始化 Session
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: False,
- }
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
-
- # 获取真实IP
- self.real_ip = self._get_realnetwork_ip()
-
- # 1. Cloudflare Turnstile
- cf_token = self._handle_cloudflare_challenge()
- # 2. 准备参数
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- mission_code = self.free_config.get("mission_code", "")
- country_code = self.free_config.get("country_code", "")
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src,
- "content-type": "application/x-www-form-urlencoded"
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission_code,
- "countrycode": country_code,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token
- }
-
- # 3. 发送登录请求 (包含 OPTIONS)
- resp = self._perform_request("POST", url, headers=headers, data=data)
- resp_json = resp.json()
- if resp_json.get('accessToken', ''):
- self.jwt_token = resp_json["accessToken"]
- VSC_INFO("vfs_plg", "[%s] Login successful, JWT obtained.", self.group_id)
- return
-
- # OTP 处理
- if resp_json.get("enableOTPAuthentication"):
- VSC_INFO("vfs_plg", "[%s] Login requires OTP.", self.group_id)
- otp = self._read_otp_email()
- self._submit_login_otp(None, otp)
- return
- raise BizLogicError(message="Login failed: No access token or OTP flow.")
- def query(self) -> VSQueryResult:
- """查询可预约 Slot"""
- result = VSQueryResult()
- appt_types = self.free_config.get("appointment_types", [])
- if not appt_types:
- raise NotFoundError(message="No matching appointment configuration found.")
- apt_config = random.choice(appt_types)
- self._fetch_configurations(apt_config)
- earliest_date = self._query_earliest_slot(apt_config)
- result.success = False
- result.availability_status = AvailabilityStatus.NoneAvailable
- result.visa_type = apt_config.get("visa_type", "")
- result.city = apt_config.get("city", "")
- result.country = apt_config.get("country", "")
- result.routing_key = apt_config.get("routing_key", "")
- if earliest_date:
- if "WaitList" in earliest_date:
- result.success = True
- result.availability_status = AvailabilityStatus.Waitlist
- VSC_INFO("vfs_plg", "[%s] Found WaitList.", self.group_id)
- else:
- result.success = True
- result.availability_status = AvailabilityStatus.Available
- result.earliest_date = earliest_date
-
- VSC_INFO("vfs_plg", "[%s] Found Slot: %s", self.group_id, earliest_date)
-
- day_info = VSQueryResult.DateAvailability()
- day_info.date = earliest_date
- result.availability.append(day_info)
- return result
- def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult:
- """
- 执行完整的预约流程,包含:上传文档 -> 添加申请人 -> OTP -> 选时间 -> 锁定 -> 支付
- """
- user_email = user_inputs.get('email')
- user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com")
-
- res = VSBookResult()
- slot_routing_key = slot_info.routing_key
-
- from_date = slot_info.earliest_date if slot_info.earliest_date else datetime.now().strftime("%Y-%m-%d")
-
- apt_config = None
- appt_types = self.free_config.get("appointment_types", [])
- for apt in appt_types:
- if apt.get("routing_key") == slot_routing_key:
- apt_config = apt
- break
-
- if not apt_config:
- raise NotFoundError(message="Book: Config missing.")
- self._fetch_configurations(apt_config)
- sub_cc = apt_config.get("subcategory_code")
- sub_conf = self.subcategory_conf.get(sub_cc, {})
- # OCR 识别 / 文档上传
- ocr_enabled = sub_conf.get("isOCREnable", False)
-
- if ocr_enabled:
- VSC_INFO("vfs_plg", "[%s] OCR Enabled, uploading documents...", self.group_id)
- upload_res = self._upload_applicant_documents(apt_config, user_inputs, upload_res)
- user_inputs["applicant_image"] = upload_res.get("passportImageFilename")
- user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes") # Base64
- user_inputs["guid"] = upload_res.get("uploadDocumentGUID")
- # 需要提供申请号 (Cover Letter)
- enable_reference_number = sub_conf.get("enableReferenceNumber", False)
- # 添加申请人 (核心步骤 1)
- final_urn = None
- is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist)
-
- add_primary_retry = 0
- MAX_RETRY = 6
-
- while add_primary_retry < MAX_RETRY:
- try:
- final_urn = self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number)
- if not final_urn:
- raise NotFoundError(message="URN not found")
- break
- except Exception as e:
- VSC_WARN("vfs_plg", "[%s] Add Applicant retry %d...", self.group_id, add_primary_retry)
- time.sleep(10)
- add_primary_retry += 1
-
- if not final_urn:
- raise BizLogicError(message="Failed to add primary applicant (Slot likely taken)")
- VSC_INFO("vfs_plg", "[%s] Applicant Added. URN: %s", self.group_id, final_urn)
- # 申请人 OTP 验证 (核心步骤 2)
- otp_enabled = sub_conf.get("isApplicantOTPEnabled", False)
-
- if otp_enabled:
- VSC_INFO("vfs_plg", "[%s] Applicant OTP Required.", self.group_id)
- if not self._applicant_otp_send(apt_config, final_urn):
- raise BizLogicError(message='applicant otp send failed')
-
- # 复用之前的读邮件逻辑
- otp_code = self._read_otp_email()
- if not self._applicant_otp_verify(apt_config, final_urn, otp_code):
- raise BizLogicError(message='applicant otp verify failed')
- # 如果是 Waitlist 模式,直接确认并返回
- if is_waitlist:
- if self._confirm_waitlist(apt_config, final_urn):
- res.success = True
- res.urn = final_urn
- return res
- raise BizLogicError(message='confirm waitlist failed')
- # 规则引擎与日期筛选 (核心步骤 3)
- expected_start = user_inputs.get("expected_start_date", "")
- expected_end = user_inputs.get("expected_end_date", "")
-
- # 计算需要扫描的月份, 如果 expected_start/end 为空,默认使用 from_date 所在月
- months = self._get_filtered_covered_months(expected_start, expected_end, from_date)
- VSC_INFO("vfs_plg", "[%s] Scanning months: %s (From: %s)", self.group_id, months, from_date)
-
- selected_slot_id = ""
- selected_slot_date = ""
- selected_slot_time_range = ""
-
- # 记录所有有号日期,避免重复处理
- all_ads = set()
- forbidden_dates = set()
-
- found_slot = False
-
- # 遍历月份寻找 Slot
- for m_str in months:
- ads = self._query_slot_calendar(apt_config, final_urn, m_str)
-
- # 过滤已知的 slots
- new_ads = [d for d in ads if d not in all_ads]
- all_ads.update(new_ads)
-
- # 尝试 3 次选择
- for _ in range(3):
- # 排除 forbidden
- avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates]
-
- # 规则筛选
- sel_dates = self._filter_dates(avail_candidates, expected_start, expected_end)
- print(f'avail_candidates={avail_candidates}, sel_dates={sel_dates}')
- if not sel_dates:
- break
-
- tmp_date = sel_dates[0]
- forbidden_dates.add(tmp_date)
-
- # 审计日志
- if not self._saveuseractionaudit(apt_config, final_urn, tmp_date):
- time.sleep(3)
- continue
-
- # 查询具体时间
- ats = self._query_slot_time(apt_config, final_urn, tmp_date)
- if not ats:
- time.sleep(3)
- continue
-
- # 随机选择一个时间段
- sel_tm = random.choice(ats)
-
- selected_slot_id = sel_tm.get("allocationId")
- selected_slot_date = tmp_date
- selected_slot_time_range = sel_tm.get("slot")
-
- found_slot = True
- break
-
- if found_slot:
- break
-
- if not found_slot:
- VSC_INFO("vfs_plg", "[%s] No valid slots found.", self.group_id)
- res.success = False
- return res
- VSC_INFO("vfs_plg", "[%s] Slot Selected: %s %s (ID: %s)",
- self.group_id, selected_slot_date, selected_slot_time_range, selected_slot_id)
- # 服务、费用、最终预约 (核心步骤 4)
- self._submit_no_addition_service(final_urn)
- amount, currency = self._query_fee(apt_config, final_urn)
-
- schedule_res = self._schedule(apt_config, final_urn, amount, currency, selected_slot_id)
-
- if not schedule_res.get("IsAppointmentBooked"):
- VSC_INFO("vfs_plg", "[%s] IsAppointmentBooked is false", self.group_id)
- res.success = False
- return res
-
- # 构造返回结果
- res.success = True
- res.account = self.config.account.username
- res.book_date = selected_slot_date
- res.book_time = selected_slot_time_range
- res.urn = final_urn
- res.fee_amount = int(amount * 100)
- res.fee_currency = currency
-
- # 处理支付链接
- if schedule_res.get("IsPaymentRequired", False):
- payload = schedule_res.get("payLoad", "")
- payment_url = self._pay_request(payload)
- if payment_url:
- res.payment_link = payment_url
- # 保存 Session
- saved_session = self._save_http_session(payment_url)
- if saved_session:
- res.session_id = saved_session['session_id']
- return res
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
-
- def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]:
- """
- 计算需要查询的月份列表,格式 YYYY-MM-DD (每月1号)
- """
- fmt = "%Y-%m-%d"
- # 默认值处理
- try:
- dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now()
- dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1)
-
- try:
- dt_from = datetime.strptime(from_date, fmt)
- except:
- dt_from = datetime.now()
- except:
- return []
- # 归一化到月初
- dt_start = dt_start.replace(day=1)
- dt_end = dt_end.replace(day=1)
- dt_from = dt_from.replace(day=1)
-
- # 起始点取 max(start, from)
- curr = max(dt_start, dt_from)
-
- months = []
- while curr <= dt_end:
- months.append(curr.strftime(fmt))
- # 下个月
- if curr.month == 12:
- curr = curr.replace(year=curr.year + 1, month=1)
- else:
- curr = curr.replace(month=curr.month + 1)
- return months
-
- def _get_realnetwork_ip(self):
- url = "https://api.ipify.org/?format=json"
- headers = {
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- }
- resp = self._perform_request('GET', url, headers=headers)
- return resp.json()['ip']
-
- def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool:
- """
- 确认加入候补名单 (对应 C++ VFSApi::confirm_waitlist)
- """
- url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "CanVFSReachoutToApplicant": True
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isConfirmed")
-
- def _upload_applicant_documents(self, apt_config, user_inputs) -> Dict:
- """上传护照图片"""
- url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument"
- passport_url = user_inputs.get("passport_image_url")
- if not passport_url:
- raise NotFoundError(message="Missing passport_image_url")
- img_resp = requests.get(passport_url, timeout=30)
- if img_resp.status_code != 200:
- raise BizLogicError(message="Failed to download passport image")
- b64_str = base64.b64encode(img_resp.content).decode('utf-8')
-
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "languageCode": "en-US",
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fileBytes": b64_str,
- "selfiImageFileBytes": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- result = resp.json()
- result["passportImageFilename"] = "passport_img.jpg"
- result["passportImageFileBytes"] = b64_str
- return result
-
- def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any],
- is_waitlist: bool, ocr_enabled: bool, enable_ref: bool) -> str:
- """
- 构造复杂的申请人 JSON payload 并提交
- """
- url = "https://lift-api.vfsglobal.com/appointment/applicants"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- #male/Male -> 1, 否则 -> 2
- gender_str = str(user_inputs.get("gender", "")).lower()
- gender_code = 1 if gender_str == "male" else 2
- raw_dial = user_inputs.get("phone_country_code", "86")
- dial_code = str(raw_dial)
- # to_ddmmyyyy (YYYY-MM-DD -> DD/MM/YYYY)
- def _to_ddmmyyyy(d_str):
- try:
- # 假设输入是 YYYY-MM-DD
- return datetime.strptime(d_str, "%Y-%m-%d").strftime("%d/%m/%Y")
- except:
- return d_str # 原样返回
- dob = _to_ddmmyyyy(str(user_inputs.get("birthday", "")))
- ppt_exp = _to_ddmmyyyy(str(user_inputs.get("passport_expiry_date", "")))
- # --- 构造单个 Applicant 对象 ---
- applicant = {
- "urn": "",
- "arn": "",
- "loginUser": self.config.account.username,
-
- # 基本信息 (全部大写)
- "firstName": str(user_inputs.get("first_name", "")).upper(),
- "middleName": "",
- "lastName": str(user_inputs.get("last_name", "")).upper(),
- "employerFirstName": "",
- "employerLastName": "",
- "salutation": "",
- "gender": gender_code,
-
- # 联系信息
- "contactNumber": str(user_inputs.get("phone", "")),
- "dialCode": dial_code,
- "employerContactNumber": "",
- "employerDialCode": "",
- "emailId": str(user_inputs.get("alias_email", "")).upper(),
- "employerEmailId": "",
-
- # 证件信息
- "passportNumber": str(user_inputs.get("passport_no", "")).upper(),
- "confirmPassportNumber": "",
- "passportExpirtyDate": ppt_exp,
- "dateOfBirth": dob,
- "nationalId": None,
-
- # 国籍 (使用全局辅助函数 get_country_iso3)
- "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))),
-
- # 地址与其它 (大部分为空)
- "state": None,
- "city": None,
- "addressline1": None,
- "addressline2": None,
- "pincode": None,
- "isEndorsedChild": False,
- "applicantType": 0,
- "vlnNumber": None,
- "applicantGroupId": 0,
- "parentPassportNumber": "",
- "parentPassportExpiry": "",
- "dateOfDeparture": None,
- "entryType": "",
- "eoiVisaType": "",
- "passportType": "",
- "vfsReferenceNumber": "",
- "familyReunificationCerificateNumber": "",
- "PVRequestRefNumber": "",
- "PVStatus": "",
- "PVStatusDescription": "",
- "PVCanAllowRetry": True,
- "PVisVerified": False,
- "eefRegistrationNumber": "",
- "isAutoRefresh": True,
- "helloVerifyNumber": "",
- "OfflineCClink": "",
- "idenfystatuscheck": False,
- "vafStatus": None,
- "SpecialAssistance": "",
- "AdditionalRefNo": None,
- "juridictionCode": "",
- "canInitiateVAF": False,
- "canEditVAF": False,
- "canDeleteVAF": False,
- "canDownloadVAF": False,
- "Retryleft": "",
-
- # 真实 IP 注入
- "ipAddress": self.real_ip
- }
- # --- 处理 Reference Number (Cover Letter) ---
- if enable_ref:
- applicant["referenceNumber"] = str(user_inputs.get("cover_letter", ""))
- else:
- applicant["referenceNumber"] = None
- # --- 处理 OCR 数据 ---
- if ocr_enabled:
- applicant["applicantImage"] = str(user_inputs.get("applicant_image", ""))
- applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", ""))
- applicant["GUID"] = str(user_inputs.get("guid", ""))
- # --- 构造最外层 Payload ---
- payload = {
- "countryCode": self.free_config.get("country_code"),
- "missionCode": self.free_config.get("mission_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "applicantList": [applicant], # 数组形式
-
- "languageCode": "en-US",
- "isWaitlist": is_waitlist,
- "isEdit": False,
- "feeEntryTypeCode": None,
- "feeExemptionTypeCode": None,
- "feeExemptionDetailsCode": None,
- "juridictionCode": None,
- "regionCode": None
- }
- # --- 发送请求 ---
- resp = self._perform_request("POST", url, headers=headers, json_data=payload)
- # --- 处理响应 ---
- return resp.json()["urn"]
-
- def _applicant_otp_send(self, apt_config, urn) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": "",
- "otpAction": "GENERATE",
- "languageCode": "en-US"
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPGenerated")
- def _applicant_otp_verify(self, apt_config, urn, otp) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- headers["datacenter"] = "GERMANY"
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "OTP": otp,
- "otpAction": "VALIDATE",
- "languageCode": "en-US"
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isOTPValidated")
-
- def _query_slot_calendar(self, apt_config, urn, from_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/calendar"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- dt_m = datetime.strptime(from_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "fromDate": converted_date,
- "urn": urn,
- "payCode": ""
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- calendars = resp.json().get("calendars")
- if calendars:
- ads_out = []
- for item in calendars:
- # "MM/DD/YYYY" -> "YYYY-MM-DD"
- raw = item.get("date")
- ads_out.append(to_yyyymmdd(raw, "%m/%d/%Y"))
- return ads_out
- return []
-
- def _query_slot_time(self, apt_config, urn, slot_date) -> List:
- url = "https://lift-api.vfsglobal.com/appointment/timeslot"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- dt_m = datetime.strptime(slot_date, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "slotDate": converted_date,
- "urn": urn
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("slots")
- def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- # ISO format conversion
- dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "firstEarliestSlotDate": dt.strftime("%d/%m/%Y"),
- "action": "schedule",
- "ipAddress": self.real_ip,
- "eadAppointmentDetail": dt.strftime("%Y-%m-%dT%H:%M:%S")
- }
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json().get("isSavedSuccess", False)
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- print(f'[perform request] {method} {url} {data} {json_data} {params}')
- # --- 1. 发送 OPTIONS 请求 ---
- try:
- # OPTIONS 请求使用相同的 URL 和 headers (部分 header 如 content-length 会被自动处理)
- # 某些服务器反爬会检测 OPTIONS 请求
- opt_headers = headers.copy() if headers else {}
-
- # 发送 OPTIONS
- self.session.request("OPTIONS", url, headers=opt_headers, timeout=10)
- # C++ 代码中并不检查 OPTIONS 的返回值,只检查执行是否成功
- # 这里我们假设只要不抛出异常即可
-
- except Exception as e:
- # 记录警告但不中断流程,防止仅仅是 OPTIONS 失败导致误判
- VSC_DEBUG("vfs_plg", "OPTIONS request failed (non-fatal): %s", str(e))
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- VSC_INFO('vfs_plg', resp.text)
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _encrypt_password(self, password: str) -> str:
- ciphertext = self.public_key.encrypt(
- password.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None
- )
- )
- return base64.b64encode(ciphertext).decode()
- def _get_orange_source(self, email: str) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"{email};{timestamp}"
- return self._encrypt_password(payload)
- def _get_client_source(self) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"GA;{timestamp}Z"
- return self._encrypt_password(payload)
- def _handle_cloudflare_challenge(self) -> str:
- """
- 完整实现的 Cloudflare Turnstile 验证逻辑
- """
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- if not mission or not country:
- raise NotFoundError(message="Missing mission_code or country_code in free_config")
- website_url = f"https://visa.vfsglobal.com/{country}/en/{mission}/login"
-
- # 构造代理字符串传给打码平台 (格式: http://user:pass@ip:port)
- proxy_str = self._get_proxy_url()
-
- # 2. 提交任务
- VSC_INFO("vfs_plg", "[%s] Submitting Turnstile task for %s...", self.group_id, website_url)
- task_out = VSCloudApi.Instance().submit_anti_turnstile_task(proxy_str, website_url)
- if not task_out:
- raise BizLogicError(message="Failed to submit captcha task to Cloud API")
-
- task_id = str(task_out.get("id"))
- if not task_id:
- raise BizLogicError(message="Cloud API returned invalid task ID")
- # 3. 轮询结果 (超时时间 120秒)
- timeout = 120
- start_time = time.time()
-
- while time.time() - start_time < timeout:
- result_out = VSCloudApi.Instance().get_anti_turnstile_result(task_id)
- if not result_out:
- time.sleep(3)
- continue
-
- # status: 0=Pending, 1=Processing, 2=Success, 3=Failed
- status = result_out.get("status", 0)
-
- if status == 2:
- raw_result = result_out.get("result", "")
-
- if isinstance(raw_result, str):
- data = json.loads(raw_result)
- else:
- data = raw_result
- token = data.get("token")
- ua = data.get("userAgent")
- cookies_list = data.get("cookies", [])
- if not token:
- raise BizLogicError("Captcha solved but token is empty")
-
- # A. 设置 User-Agent
- if ua:
- self.user_agent = ua
- self.session.headers["User-Agent"] = ua
- # B. 设置 Cookies
- if cookies_list:
- VSC_DEBUG("vfs_plg", "[%s] Syncing %d cookies from Captcha solver...", self.group_id, len(cookies_list))
- for cookie in cookies_list:
- # 兼容不同的 cookie 格式
- c_name = cookie.get("name")
- c_value = cookie.get("value")
- c_domain = cookie.get("domain", "")
- c_path = cookie.get("path", "/")
-
- if c_name and c_value:
- self.session.cookies.set(
- name=c_name,
- value=c_value,
- domain=c_domain,
- path=c_path
- )
-
- VSC_INFO("vfs_plg", "[%s] Cloudflare challenge passed.", self.group_id)
- return token
- elif status == 3: # Failed
- err_msg = result_out.get("result", "Unknown error")
- raise BizLogicError(message=f"Captcha task failed: {err_msg}")
-
- else:
- # Pending / Processing
- time.sleep(3)
-
- raise BizLogicError(message="Captcha task timeout (120s)")
- def _get_common_headers(self, with_auth=True) -> Dict[str, str]:
- mission = self.free_config.get("mission_code", "")
- country = self.free_config.get("country_code", "")
- lang = self.free_config.get("language", "en")
- route = f"{country}/{lang}/{mission}"
-
- h = {
- "accept": "application/json, text/plain, */*",
- "accept-language": "en-US,en;q=0.9",
- "origin": "https://visa.vfsglobal.com",
- "referer": "https://visa.vfsglobal.com/",
- "route": route
- }
-
- client_src = self._get_client_source()
- h["clientsource"] = client_src
-
- if with_auth and self.jwt_token:
- h["authorize"] = self.jwt_token
-
- return h
- def _query_earliest_slot(self, apt_config) -> Optional[str]:
- """
- 查询最早 Slot(403 自动绕盾 + 重试)
- """
- url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable"
- max_retries = self.free_config.get("slot_query_max_retries", 2)
- data = {
- "missioncode": self.free_config.get("mission_code"),
- "countrycode": self.free_config.get("country_code"),
- "vacCode": apt_config.get("vac_code"),
- "visaCategoryCode": apt_config.get("subcategory_code"),
- "roleName": "Individual",
- "loginUser": self.config.account.username,
- "payCode": ""
- }
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- for attempt in range(1, max_retries + 1):
- try:
- resp = self._perform_request(
- "POST",
- url,
- headers=headers,
- json_data=data
- )
- break # ✅ 请求成功,跳出重试循环
- except PermissionDeniedError:
- VSC_WARN(
- "vfs_plg",
- "[VFS] Earliest slot blocked (403), attempt %d/%d",
- attempt, max_retries
- )
- # 最后一次就不再绕盾了
- if attempt >= max_retries:
- raise PermissionDeniedError()
- self._handle_cloudflare_challenge()
- VSC_INFO("vfs_plg", "[VFS] Cloudflare bypass success, retrying...")
- continue
- # ====== 正常解析响应 ======
- if "WaitList" in resp.text:
- return "WaitList"
- j = resp.json()
- if j.get("earliestSlotLists"):
- raw_date = j["earliestSlotLists"][0]["date"]
- return to_yyyymmdd(raw_date, "%m/%d/%Y %H:%M:%S")
- return ""
- def _fetch_configurations(self, apt_config: Dict[str, Any]):
- # 1. 获取所有中心配置 (query_center)
- if not self.center_conf:
- self.center_conf = self._query_center()
- # 2. 获取 Visa Category 配置
- vac_code = apt_config.get("vac_code")
- category_code = apt_config.get("category_code")
-
- # 检查目标 category_code 是否已在缓存中
- if category_code not in self.category_conf:
- visa_categories = []
- visa_categories = self._query_visa_category(vac_code)
-
- found = False
- for vc in visa_categories:
- if vc.get("code") == category_code:
- self.category_conf[category_code] = vc
- found = True
- break
-
- # 如果没找到,可能配置错误,但 C++ 没报错,只继续
- if not found:
- raise NotFoundError(message=f"{self.group_id} Category code {category_code} not found in VAC {vac_code}")
- # 3. 获取 Visa SubCategory 配置
- sub_category_code = apt_config.get("subcategory_code")
- if sub_category_code not in self.subcategory_conf:
- visa_subcategories = self._query_visa_sub_category(vac_code, category_code)
-
- found = False
- for svc in visa_subcategories:
- if svc.get("code") == sub_category_code:
- self.subcategory_conf[sub_category_code] = svc
- found = True
- break
-
- if not found:
- raise NotFoundError(message=f"{self.group_id} SubCategory code {sub_category_code} not found")
- def _query_center(self) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_category(self, center_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
-
- def _query_visa_sub_category(self, center_code: str, category_code: str) -> List:
- mission = self.free_config.get("mission_code")
- country = self.free_config.get("country_code")
- enc_center = urllib.parse.quote(center_code)
- enc_cat = urllib.parse.quote(category_code)
- url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US"
- headers = self._get_common_headers(with_auth=False)
- resp = self._perform_request("GET", url, headers=headers)
- return resp.json()
- def _read_otp_email(self) -> str:
- """
- 读取 OTP 邮件
- """
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- sender = "donotreply at vfshelpline.com"
- subject_keywords = "One Time Password"
- body_keywords = "OTP"
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- VSC_INFO("vfs_plg", "[%s] Waiting for OTP email sent after %s...", self.group_id, formatted_utc_time)
- # 3. 轮询查收
- for i in range(12):
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email,
- sender,
- recipient,
- subject_keywords,
- body_keywords,
- formatted_utc_time,
- 300
- )
- if content_out:
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- otp = match.group(0)
- VSC_INFO("vfs_plg", "[%s] OTP code found: %s", self.group_id, otp)
- return otp
-
- time.sleep(5)
- raise NotFoundError(message="OTP email not found (timeout)")
- def _submit_login_otp(self, cf_token: str, otp: str):
- """
- 提交 OTP 验证码进行登录
- """
- VSC_INFO("vfs_plg", "[%s] Submitting Login OTP...", self.group_id)
- # 1. 准备基础数据
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- mission_code = self.free_config.get("mission_code", "")
- country_code = self.free_config.get("country_code", "")
-
- # 2. 生成加密 Source (每次请求时间戳不同,建议重新生成)
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- # 3. 构造请求
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src,
- "content-type": "application/x-www-form-urlencoded"
- })
-
- # 为了稳健,如果传入为空,尝试重新获取。
- if not cf_token:
- VSC_DEBUG("vfs_plg", "[%s] CF Token is empty, regenerating for OTP...", self.group_id)
- cf_token = self._handle_cloudflare_challenge()
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission_code,
- "countrycode": country_code,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token,
- "otp": otp # 关键字段:OTP 验证码
- }
- # 4. 发送请求 (POST form-urlencoded)
- resp = self._perform_request("POST", url, headers=headers, data=data)
- resp_json = resp.json()
- if resp_json["accessToken"]:
- self.jwt_token = resp_json["accessToken"]
- VSC_INFO("vfs_plg", "[%s] OTP Login successful, JWT obtained.", self.group_id)
- return
- raise PermissionDeniedError(message=resp.text)
-
- def _submit_no_addition_service(self, urn):
- url = "https://lift-api.vfsglobal.com/vas/mapvas"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "urn": urn,
- "applicants": []
- }
- # C++ 只请求不检查结果,或者只要200就行
- self._perform_request("POST", url, headers=headers, json_data=data)
- def _query_fee(self, apt_config, urn) -> Tuple[float, str]:
- url = "https://lift-api.vfsglobal.com/appointment/fees"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "languageCode": "en-US"
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- j = resp.json()
- return j.get("totalamount"), j["feeDetails"][0].get("currency")
- def _schedule(self, apt_config, urn, amount, currency, slot_id) -> Dict:
- url = "https://lift-api.vfsglobal.com/appointment/schedule"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("mission_code"),
- "countryCode": self.free_config.get("country_code"),
- "centerCode": apt_config.get("vac_code"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "paymentdetails": {
- "paymentmode": "Online",
- "RequestRefNo": "",
- "clientId": "",
- "merchantId": "",
- "amount": amount,
- "currency": currency
- },
- "allocationId": str(slot_id),
- "CanVFSReachoutToApplicant": True
- }
-
- resp = self._perform_request("POST", url, headers=headers, json_data=data)
- return resp.json()
- def _pay_request(self, payload) -> str:
- # C++: 检查 301/302 Redirect Location
- url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}"
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "referer": "https://visa.vfsglobal.com/",
- "user-agent": self.user_agent
- }
-
- # allow_redirects=False 以捕获 Location header
- resp = self.session.get(url, headers=headers, allow_redirects=False)
- if resp.status_code in [301, 302]:
- loc = resp.headers.get("Location", "")
- if loc.startswith("http"):
- return loc
- else:
- return "https://online.vfsglobal.com" + loc if loc.startswith("/") else "https://online.vfsglobal.com/" + loc
- else:
- raise NotFoundError(message='payment link not found')
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- """
- 根据用户的期望范围筛选可用日期
-
- :param dates: API 返回的可用日期列表 (YYYY-MM-DD)
- :param start_str: 用户期望开始日期 (YYYY-MM-DD)
- :param end_str: 用户期望结束日期 (YYYY-MM-DD)
- :return: 符合要求的日期列表
- """
- # 如果没有设置范围,则不过滤,返回所有日期
- if not start_str or not end_str:
- return dates
-
- valid_dates = []
- # 截取前10位以防带有时分秒
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- # 比较范围 (闭区间)
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
- def _save_http_session(self, page_url):
- """
- 提取 cookies, local_storage, 存入 VSCloudApi
- """
- cookies_dict = {}
- # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict()
- if hasattr(self.session.cookies, "get_dict"):
- cookies_dict = self.session.cookies.get_dict()
- else:
- # 方式 2: 迭代 (兼容标准 CookieJar)
- for c in self.session.cookies:
- cookies_dict[c.name] = c.value
- cookies_str = json.dumps(cookies_dict)
-
- # 简单生成 SessionID hash
- ua_str = self.user_agent or "unknown_ua"
- raw = cookies_str + ua_str + page_url
-
- session_id = hashes.Hash(hashes.SHA256(), backend=default_backend())
- session_id.update(raw.encode())
- sid = session_id.finalize().hex()
-
- proxy_str = ""
- if self.config.proxy.ip:
- proxy_str = f"{self.config.proxy.scheme}://"
- if self.config.proxy.username:
- proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@"
- proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}"
-
- return VSCloudApi.Instance().create_http_session(
- sid, cookies_str, "", ua_str, proxy_str, page_url
- )
|