| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687 |
- # plugins/vfs_global_plugin.py
- import time
- import json
- import random
- import base64
- import re
- import urllib.parse
- from datetime import datetime
- from typing import Dict, Any, Optional, List, Tuple
- # 使用 curl_cffi 模拟浏览器 TLS 指纹,这是 VFS 必须的
- try:
- from curl_cffi import requests
- except ImportError:
- raise ImportError("Please install curl-cffi: pip install curl-cffi")
- # 加密库
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- from vs_plg import IVSPlg, VSError # type: ignore
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, QueryWaitMode # type: ignore
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore
- from toolkit.vs_cloud_api import VSCloudApi # type: ignore
- from toolkit.rule_engine import RuleEngine # type: ignore
- # ----------------- 静态常量与辅助数据 -----------------
- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc
- LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs
- t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y
- t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6
- 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0
- 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2
- GQIDAQAB
- -----END PUBLIC KEY-----"""
- COUNTRY_MAP = {
- "china": "CHN", "france": "FRA", "germany": "DEU", "italy": "ITA",
- "united kingdom": "GBR", "united states": "USA", "india": "IND",
- "russia": "RUS", "turkey": "TUR", "vietnam": "VNM"
- }
- def get_country_iso3(name: str) -> str:
- return COUNTRY_MAP.get(name.lower(), "CHN")
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- """
- 将邮箱域名替换为指定域名(默认 gmail-app.com)
- """
- if "@" not in email:
- raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- # ----------------- VfsPlugin 类 -----------------
- class VfsPlugin(IVSPlg):
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
-
- self.session = requests.Session()
- # 模拟 Chrome 124
- self.session.impersonate = "chrome124"
-
- self.jwt_token = ""
- self.user_agent = ""
- self.last_error = VSError(0, "OK")
- self.real_ip = ""
-
- # 缓存配置
- self.center_conf = None
- self.category_conf = {}
- self.subcategory_conf = {}
-
- # 加载公钥
- self.public_key = serialization.load_pem_public_key(
- VFS_PUBLIC_KEY_PEM.encode(),
- backend=default_backend()
- )
- # --- IVSPlg 接口实现 ---
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
-
- # 设置代理
- if config.proxy.ip:
- proxy_str = f"{config.proxy.scheme}://"
- if config.proxy.username:
- proxy_str += f"{config.proxy.username}:{config.proxy.password}@"
- proxy_str += f"{config.proxy.ip}:{config.proxy.port}"
- self.session.proxies = {"http": proxy_str, "https": proxy_str}
- VSC_DEBUG("vfs_plg", "[%s] Proxy set: %s", self.group_id, config.proxy.ip)
- def health_check(self) -> bool:
- return True
- def get_last_error(self) -> VSError:
- return self.last_error
- def create_session(self) -> bool:
- """登录流程"""
- VSC_INFO("vfs_plg", "[%s] Starting login...", self.group_id)
-
- # 1. Cloudflare Turnstile
- cf_token = self._handle_cloudflare_challenge()
- if not cf_token:
- return False
- # 2. 准备参数
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- mission_code = self.free_config.get("missionCode", "")
- country_code = self.free_config.get("countryCode", "")
-
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src,
- "content-type": "application/x-www-form-urlencoded"
- })
-
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission_code,
- "countrycode": country_code,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token
- }
-
- # 3. 发送登录请求 (包含 OPTIONS)
- if not self._perform_request("POST", url, headers=headers, data=data):
- return False
-
- try:
- resp_json = self.session.last_response.json()
- if "accessToken" in resp_json and resp_json["accessToken"]:
- self.jwt_token = resp_json["accessToken"]
- VSC_INFO("vfs_plg", "[%s] Login successful, JWT obtained.", self.group_id)
- return True
-
- # OTP 处理
- if resp_json.get("enableOTPAuthentication"):
- VSC_INFO("vfs_plg", "[%s] Login requires OTP.", self.group_id)
- otp = self._read_otp_email()
- if not otp:
- self._set_error(3001, "Failed to read Login OTP")
- return False
- return self._submit_login_otp(None, otp)
-
- self._set_error(1001, "Login failed: No access token or OTP flow.")
- return False
-
- except Exception as e:
- self._set_error(9001, f"Login parse error: {str(e)}")
- return False
- def query(self) -> VSQueryResult:
- """查询可预约 Slot"""
- result = VSQueryResult()
-
- apt_config = None
- target_tag = self.group_id
-
- appt_types = self.free_config.get("appointmentType", [])
- for apt in appt_types:
- if apt.get("tag") == target_tag or len(appt_types) == 1:
- apt_config = apt
- break
-
- if not apt_config:
- self._set_error(2001, "No matching appointment configuration found.")
- return result
- if not self._fetch_configurations(apt_config):
- return result
- earliest_date = []
- if not self._query_earliest_slot(apt_config, earliest_date):
- return result
-
- if not earliest_date:
- return result
- date_str = earliest_date[0]
-
- result.success = True
- result.visa_type = apt_config.get("subcategoryCode", "")
- result.city = apt_config.get("vacCode", "")
- result.country = self.free_config.get("countryCode", "")
-
- if "WaitList" in date_str:
- result.availability_status = AvailabilityStatus.Waitlist
- result.earliest_date = "WaitList"
- VSC_INFO("vfs_plg", "[%s] Found WaitList.", self.group_id)
- else:
- result.availability_status = AvailabilityStatus.Available
- result.earliest_date = date_str
- VSC_INFO("vfs_plg", "[%s] Found Slot: %s", self.group_id, date_str)
-
- day_info = VSQueryResult.DateAvailability()
- day_info.date = date_str
- result.availability.append(day_info)
- return result
-
- def _get_filtered_covered_months(self, start_date, end_date, from_date) -> List[str]:
- """
- 计算需要查询的月份列表,格式 YYYY-MM-DD (每月1号)
- """
- fmt = "%Y-%m-%d"
- # 默认值处理
- try:
- dt_start = datetime.strptime(start_date, fmt) if start_date else datetime.now()
- dt_end = datetime.strptime(end_date, fmt) if end_date else datetime.now().replace(year=datetime.now().year + 1)
-
- # from_date 格式可能是 DD/MM/YYYY (从 slot_info 来)
- try:
- dt_from = datetime.strptime(from_date, "%d/%m/%Y")
- except:
- dt_from = datetime.now()
- except:
- return []
- # 归一化到月初
- dt_start = dt_start.replace(day=1)
- dt_end = dt_end.replace(day=1)
- dt_from = dt_from.replace(day=1)
-
- # 起始点取 max(start, from)
- curr = max(dt_start, dt_from)
-
- months = []
- while curr <= dt_end:
- months.append(curr.strftime(fmt))
- # 下个月
- if curr.month == 12:
- curr = curr.replace(year=curr.year + 1, month=1)
- else:
- curr = curr.replace(month=curr.month + 1)
- return months
- def book(self, slot_info: VSQueryResult, user_inputs) -> VSBookResult:
- """
- 执行完整的预约流程 (对应 C++ VFSApi::book)
- 包含:上传文档 -> 添加申请人 -> OTP -> 选时间 -> 锁定 -> 支付
- """
- user_email = user_inputs.get('email', 'get_visa_666@example.com')
- user_inputs['alias_email'] = get_alias_email(user_email, new_domain="gmail-app.com")
-
- res = VSBookResult()
- # 定位 Appointment Config
- slot_routing_key = slot_info.routing_key
-
- # C++ 中 from_date 是入参,对应 Python 的 slot_info.earliest_date
- from_date = slot_info.earliest_date if slot_info.earliest_date else datetime.now().strftime("%d/%m/%Y")
-
- apt_config = None
- appt_types = self.free_config.get("appointmentType", [])
- for apt in appt_types:
- if apt.get("routingKey") == slot_routing_key or len(appt_types) == 1:
- apt_config = apt
- break
-
- if not apt_config:
- self._set_error(3001, "Book: Config missing.")
- return res
- # 刷新子配置 (获取 OCR/OTP 开关)
- if not self._fetch_configurations(apt_config):
- return res
- sub_cc = apt_config.get("subcategoryCode")
- sub_conf = self.subcategory_conf.get(sub_cc, {})
- # ---------------- OCR 识别 / 文档上传 ----------------
- # C++: bool ocr_enabled = ...
- ocr_enabled = sub_conf.get("isOCREnable", False)
-
- if ocr_enabled:
- VSC_INFO("vfs_plg", "[%s] OCR Enabled, uploading documents...", self.group_id)
- upload_res = {}
- if not self._upload_applicant_documents(apt_config, user_inputs, upload_res):
- return res
- # 回填上传结果到 user_inputs,供 add_primary_applicant 使用
- user_inputs["applicant_image"] = upload_res.get("passportImageFilename")
- user_inputs["applicant_image_data"] = upload_res.get("passportImageFileBytes") # Base64
- user_inputs["guid"] = upload_res.get("uploadDocumentGUID")
- # ---------------- 需要提供申请号 (Cover Letter) ----------------
- enable_reference_number = sub_conf.get("enableReferenceNumber", False)
- # ---------------- 添加申请人 (核心步骤 1) ----------------
- urn = []
- is_waitlist = (slot_info.availability_status == AvailabilityStatus.Waitlist)
-
- # C++: Retry loop for 422 Invalid Request
- add_primary_retry = 0
- MAX_RETRY = 3
- success_add = False
-
- while add_primary_retry < MAX_RETRY:
- urn = [] # 清空
- if self._add_primary_applicant(apt_config, user_inputs, is_waitlist, ocr_enabled, enable_reference_number, urn):
- success_add = True
- break
-
- # 检查是否是 422 错误
- err = self.get_last_error()
- # 注意:需要在 _perform_request 或 _add_primary_applicant 中正确解析并设置 error_code 为 422
- # 简单起见,如果 msg 包含 Invalid request 也算
- if err.error_code == 422 or "Invalid request" in err.error_message:
- VSC_WARN("vfs_plg", "[%s] Add Applicant 422 error, retrying in 10s...", self.group_id)
- time.sleep(10)
- add_primary_retry += 1
- else:
- # 其他错误直接退出
- return res
-
- if not success_add:
- self._set_error(3002, "Failed to add primary applicant after retries")
- return res
- final_urn = urn[0]
- VSC_INFO("vfs_plg", "[%s] Applicant Added. URN: %s", self.group_id, final_urn)
- # ---------------- 申请人 OTP 验证 (核心步骤 2) ----------------
- otp_enabled = sub_conf.get("isApplicantOTPEnabled", False)
-
- if otp_enabled:
- VSC_INFO("vfs_plg", "[%s] Applicant OTP Required.", self.group_id)
- if not self._applicant_otp_send(apt_config, final_urn):
- return res
-
- otp_code = self._read_otp_email() # 复用之前的读邮件逻辑
- if not otp_code:
- self._set_error(3003, "Failed to read Applicant OTP from email")
- return res
-
- if not self._applicant_otp_verify(apt_config, final_urn, otp_code):
- return res
- # ---------------- 如果是 Waitlist 模式,直接确认并返回 ----------------
- if is_waitlist:
- if self._confirm_waitlist(apt_config, final_urn):
- res.success = True
- res.urn = final_urn
- res.order_id = final_urn
- res.message = "Joined Waitlist"
- return res
- return res
- # ---------------- 规则引擎与日期筛选 (核心步骤 3) ----------------
- # C++: RuleEngine rule_engine(rules);
- rules_str = user_inputs.get("rules", "")
- rule_engine = RuleEngine(rules_str)
-
- expected_start = user_inputs.get("expected_start_date", "")
- expected_end = user_inputs.get("expected_end_date", "")
- rule_engine.set_date_range_start(expected_start)
- rule_engine.set_date_range_end(expected_end)
-
- # 计算需要扫描的月份
- # 如果 expected_start/end 为空,默认使用 from_date 所在月
- months = self._get_filtered_covered_months(expected_start, expected_end, from_date)
- VSC_INFO("vfs_plg", "[%s] Scanning months: %s (From: %s)", self.group_id, months, from_date)
-
- selected_slot_id = ""
- selected_slot_date = ""
- selected_slot_time_range = ""
-
- all_ads = set() # 记录所有有号日期,避免重复处理
- forbidden_dates = set()
-
- found_slot = False
-
- # 遍历月份寻找 Slot
- for m_str in months:
- # m_str format: YYYY-MM-DD (月初)
- # C++ 需要 DD/MM/YYYY
- try:
- dt_m = datetime.strptime(m_str, "%Y-%m-%d")
- converted_date = dt_m.strftime("%d/%m/%Y")
- except:
- continue
- ads = [] # Available Date Strings
- if not self._query_slot_calendar(apt_config, final_urn, converted_date, ads):
- time.sleep(3)
- continue
-
- if not ads:
- time.sleep(3)
- continue
-
- # 过滤已知的 slots
- new_ads = [d for d in ads if d not in all_ads]
- all_ads.update(new_ads)
-
- # 尝试 3 次选择
- for _ in range(3):
- # 排除 forbidden
- avail_candidates = [d for d in list(all_ads) if d not in forbidden_dates]
-
- # 规则筛选
- sel_dates = rule_engine.select_date(avail_candidates, "%d/%m/%Y")
- if not sel_dates:
- break # 当前月份符合规则的都没了
-
- tmp_date = sel_dates[0] # 取第一个符合规则的日期
- forbidden_dates.add(tmp_date) # 标记为已尝试
-
- # 审计日志 (C++ saveuseractionaudit)
- if not self._saveuseractionaudit(apt_config, final_urn, tmp_date):
- time.sleep(3)
- continue
-
- # 查询具体时间 (query_slot_time)
- ats = []
- if not self._query_slot_time(apt_config, final_urn, tmp_date, ats):
- time.sleep(3)
- continue
-
- if not ats:
- continue
-
- # 随机选择一个时间段
- sel_tm = random.choice(ats)
-
- selected_slot_id = sel_tm.get("allocationId")
- selected_slot_date = tmp_date
- selected_slot_time_range = sel_tm.get("slot")
-
- found_slot = True
- break
-
- if found_slot:
- break
-
- if not found_slot:
- self._set_error(3004, "No valid slots found after Rule Engine filtering.")
- return res
- VSC_INFO("vfs_plg", "[%s] Slot Selected: %s %s (ID: %s)",
- self.group_id, selected_slot_date, selected_slot_time_range, selected_slot_id)
- # ---------------- 服务、费用、最终预约 (核心步骤 4) ----------------
-
- # 跳过附加服务
- if not self._submit_no_addition_service(final_urn):
- return res
-
- # 查询费用
- amount = 0.0
- currency = ""
- amount, currency = self._query_fee(apt_config, final_urn)
- # 简单保留两位小数
- amount = round(amount, 2)
-
- # 最终提交 (Schedule)
- schedule_res = {}
- # C++: schedule_with_retry (max 3)
- schedule_success = False
- for _ in range(3):
- if self._schedule(apt_config, final_urn, amount, currency, selected_slot_id, schedule_res):
- schedule_success = True
- break
- # 检查是否是被防火墙拦截
- if self.get_last_error().error_code == 403:
- # 重新处理 CF
- self._handle_cloudflare_challenge()
- else:
- break # 其他错误不重试
-
- if not schedule_success:
- return res
-
- # ---------------- 构造返回结果 ----------------
- res.success = True
- res.book_date = selected_slot_date
- res.book_time = selected_slot_time_range
- res.urn = final_urn
- res.order_id = final_urn
- res.fee_amount = int(amount * 100)
- res.fee_currency = currency
-
- # 处理支付链接
- is_pay_req = schedule_res.get("IsPaymentRequired", False)
- if is_pay_req:
- payload = schedule_res.get("payLoad", "")
- payment_url = self._pay_request(payload)
- if payment_url:
- res.payment_link = payment_url
- # 保存 Session (C++ save_http_session)
- saved_session = self._save_http_session(payment_url)
- if saved_session:
- res.session_id = saved_session['session_id']
- else:
- res.message = "Booking confirmed (No payment required)"
- return res
-
- def _confirm_waitlist(self, apt_config: Dict[str, Any], urn: str) -> bool:
- """
- 确认加入候补名单 (对应 C++ VFSApi::confirm_waitlist)
- """
- url = "https://lift-api.vfsglobal.com/appointment/ConfirmWaitlist"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "CanVFSReachoutToApplicant": True
- }
- # 发送请求
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
- try:
- j = self.session.last_response.json()
-
- # 1. 检查 API 返回的 error 字段
- if "error" in j and j["error"]:
- err_val = j["error"]
- # 兼容 error 为字符串或对象的情况
- if isinstance(err_val, str) and err_val:
- self._set_error(2006, f"Confirm Waitlist API Error: {err_val}")
- return False
- elif isinstance(err_val, dict) or isinstance(err_val, list):
- self._set_error(2006, f"Confirm Waitlist API Error: {err_val}")
- return False
- # 2. 检查 isConfirmed 字段
- if j.get("isConfirmed") is True:
- VSC_INFO("vfs_plg", "[%s] Waitlist confirmed successfully for URN: %s", self.group_id, urn)
- return True
-
- self._set_error(2007, f"Confirm Waitlist failed, response: {str(j)[:100]}")
-
- except Exception as e:
- self._set_error(9001, f"Confirm Waitlist parse error: {str(e)}")
-
- return False
-
- def _upload_applicant_documents(self, apt_config, user_inputs, res_out: Dict) -> bool:
- """上传护照图片"""
- url = "https://lift-api.vfsglobal.com/appointment/UploadApplicantDocument"
- passport_url = user_inputs.get("passport_image_url")
- if not passport_url:
- self._set_error(9007, "Missing passport_image_url")
- return False
- # 下载图片转 Base64 (C++ download_img_encode_base64)
- try:
- # 简单的下载,这里不使用 session,直接下载静态资源
- img_resp = requests.get(passport_url, timeout=30)
- if img_resp.status_code != 200:
- self._set_error(9008, "Failed to download passport image")
- return False
- b64_str = base64.b64encode(img_resp.content).decode('utf-8')
- except Exception as e:
- self._set_error(9008, f"Image download exception: {e}")
- return False
-
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missioncode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "languageCode": "en-US",
- "visaCategoryCode": apt_config.get("subcategoryCode"),
- "fileBytes": b64_str,
- "selfiImageFileBytes": "" # 暂不支持自拍
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- j = self.session.last_response.json()
- if "error" in j and j["error"]:
- self._set_error(2005, f"Upload error: {j}")
- return False
-
- res_out.update(j)
- # 补全 C++ 逻辑中的模拟文件名
- res_out["passportImageFilename"] = "passport_img.jpg"
- res_out["passportImageFileBytes"] = b64_str
- return True
- except:
- return False
-
- def _add_primary_applicant(self, apt_config: Dict[str, Any], user_inputs: Dict[str, Any],
- is_waitlist: bool, ocr_enabled: bool, enable_ref: bool,
- urn_out_list: List[str]) -> bool:
- """
- 添加主申请人 (对应 C++ VFSApi::add_primary_applicant)
- 构造复杂的申请人 JSON payload 并提交
- """
- url = "https://lift-api.vfsglobal.com/appointment/applicants"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
- # --- 辅助 Helper: 映射性别 ---
- # C++: male/Male -> 1, 否则 -> 2
- gender_str = str(user_inputs.get("gender", "")).lower()
- gender_code = 1 if gender_str == "male" else 2
- # --- 辅助 Helper: 获取 Dial Code ---
- # C++ 逻辑处理了 int 和 string
- raw_dial = user_inputs.get("phone_country_code", "86")
- dial_code = str(raw_dial)
- # --- 辅助 Helper: 格式化日期 ---
- # C++: to_ddmmyyyy (YYYY-MM-DD -> DD/MM/YYYY)
- def _to_ddmmyyyy(d_str):
- try:
- # 假设输入是 YYYY-MM-DD
- return datetime.strptime(d_str, "%Y-%m-%d").strftime("%d/%m/%Y")
- except:
- return d_str # 原样返回
- dob = _to_ddmmyyyy(str(user_inputs.get("birthday", "")))
- ppt_exp = _to_ddmmyyyy(str(user_inputs.get("passport_expiry_date", "")))
- # --- 构造单个 Applicant 对象 ---
- # 对应 C++ 中庞大的 applicant JSON 构建
- applicant = {
- "urn": "",
- "arn": "",
- "loginUser": self.config.account.username,
-
- # 基本信息 (全部大写)
- "firstName": str(user_inputs.get("first_name", "")).upper(),
- "middleName": "",
- "lastName": str(user_inputs.get("last_name", "")).upper(),
- "employerFirstName": "",
- "employerLastName": "",
- "salutation": "",
- "gender": gender_code,
-
- # 联系信息
- "contactNumber": str(user_inputs.get("phone", "")),
- "dialCode": dial_code,
- "employerContactNumber": "",
- "employerDialCode": "",
- "emailId": str(user_inputs.get("alias_email", "")).upper(),
- "employerEmailId": "",
-
- # 证件信息
- "passportNumber": str(user_inputs.get("passport_no", "")).upper(),
- "confirmPassportNumber": "", # 通常留空
- "passportExpirtyDate": ppt_exp, # 注意拼写 Expirty 是 VFS API 的特征
- "dateOfBirth": dob,
- "nationalId": None,
-
- # 国籍 (使用全局辅助函数 get_country_iso3)
- "nationalityCode": get_country_iso3(str(user_inputs.get("nationality", ""))),
-
- # 地址与其它 (大部分为空)
- "state": None,
- "city": None,
- "addressline1": None,
- "addressline2": None,
- "pincode": None,
- "isEndorsedChild": False,
- "applicantType": 0,
- "vlnNumber": None,
- "applicantGroupId": 0,
- "parentPassportNumber": "",
- "parentPassportExpiry": "",
- "dateOfDeparture": None,
- "entryType": "",
- "eoiVisaType": "",
- "passportType": "",
- "vfsReferenceNumber": "",
- "familyReunificationCerificateNumber": "",
- "PVRequestRefNumber": "",
- "PVStatus": "",
- "PVStatusDescription": "",
- "PVCanAllowRetry": True,
- "PVisVerified": False,
- "eefRegistrationNumber": "",
- "isAutoRefresh": True,
- "helloVerifyNumber": "",
- "OfflineCClink": "",
- "idenfystatuscheck": False,
- "vafStatus": None,
- "SpecialAssistance": "",
- "AdditionalRefNo": None,
- "juridictionCode": "",
- "canInitiateVAF": False,
- "canEditVAF": False,
- "canDeleteVAF": False,
- "canDownloadVAF": False,
- "Retryleft": "",
-
- # 真实 IP 注入
- "ipAddress": self.real_ip or "127.0.0.1"
- }
- # --- 处理 Reference Number (Cover Letter) ---
- if enable_ref:
- applicant["referenceNumber"] = str(user_inputs.get("cover_letter", ""))
- else:
- applicant["referenceNumber"] = None
- # --- 处理 OCR 数据 ---
- if ocr_enabled:
- # 必须从 user_inputs 获取上传后返回的 metadata
- applicant["applicantImage"] = str(user_inputs.get("applicant_image", ""))
- applicant["applicantImageData"] = str(user_inputs.get("applicant_image_data", ""))
- applicant["GUID"] = str(user_inputs.get("guid", ""))
- # --- 构造最外层 Payload ---
- payload = {
- "countryCode": self.free_config.get("countryCode"),
- "missionCode": self.free_config.get("missionCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategoryCode"),
- "applicantList": [applicant], # 数组形式
-
- "languageCode": "en-US",
- "isWaitlist": is_waitlist,
- "isEdit": False,
- "feeEntryTypeCode": None,
- "feeExemptionTypeCode": None,
- "feeExemptionDetailsCode": None,
- "juridictionCode": None,
- "regionCode": None
- }
- # --- 发送请求 ---
- if not self._perform_request("POST", url, headers=headers, json_data=payload):
- return False
- # --- 处理响应 ---
- try:
- j = self.session.last_response.json()
-
- # 1. 成功情况:返回了 urn
- if "urn" in j and j["urn"]:
- urn_out_list.append(j["urn"])
- return True
-
- # 2. 错误处理:检查是否为 422 Invalid Request (反爬/校验失败)
- # C++ 逻辑依赖于捕获这个特定的错误码来进行重试
- if "error" in j and j["error"]:
- err_data = j["error"]
- code = 0
- desc = ""
-
- if isinstance(err_data, dict):
- code = err_data.get("code", 0)
- desc = err_data.get("description", "") or err_data.get("message", "")
-
- # 设置到 last_error 以供上层重试逻辑检查
- if code == 422 or "Invalid request" in desc:
- self._set_error(422, f"Add Applicant 422: {desc}")
- else:
- self._set_error(3005, f"Add Applicant API Error: {desc}")
-
- return False
- except Exception as e:
- self._set_error(9001, f"Add Applicant parse error: {str(e)}")
- return False
-
- def _applicant_otp_send(self, apt_config, urn) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "OTP": "",
- "otpAction": "GENERATE",
- "languageCode": "en-US"
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- if self.session.last_response.json().get("isOTPGenerated"):
- return True
- except:
- pass
- return False
- def _applicant_otp_verify(self, apt_config, urn, otp) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/applicantotp"
- headers = self._get_common_headers(with_auth=True)
- # C++ specific: datacenter header
- headers["datacenter"] = "GERMANY"
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "urn": urn,
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "OTP": otp,
- "otpAction": "VALIDATE",
- "languageCode": "en-US"
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- if self.session.last_response.json().get("isOTPValidated"):
- return True
- except:
- pass
- return False
-
- def _query_slot_calendar(self, apt_config, urn, from_date, ads_out: List) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/calendar"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategoryCode"),
- "fromDate": from_date,
- "urn": urn,
- "payCode": ""
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- j = self.session.last_response.json()
- calendars = j.get("calendars")
- if calendars:
- ads_out.clear()
- for item in calendars:
- # item["date"] is usually "MM/DD/YYYY" or "YYYY-MM-DD" depending on API version
- # C++ assumes "MM/DD/YYYY" -> "DD/MM/YYYY"
- raw = item.get("date")
- try:
- # Normalize to DD/MM/YYYY
- dObj = datetime.strptime(raw, "%m/%d/%Y")
- ads_out.append(dObj.strftime("%d/%m/%Y"))
- except:
- ads_out.append(raw)
- return True
- except:
- pass
- return False
- def _query_slot_time(self, apt_config, urn, slot_date, ats_out: List) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/timeslot"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "visaCategoryCode": apt_config.get("subcategoryCode"),
- "slotDate": slot_date,
- "urn": urn
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- j = self.session.last_response.json()
- slots = j.get("slots")
- if slots:
- ats_out.extend(slots)
- return True
- except:
- pass
- return False
- def _saveuseractionaudit(self, apt_config, urn, earliest_date) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/saveuseractionaudit"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- # ISO format conversion
- try:
- dt = datetime.strptime(earliest_date, "%d/%m/%Y")
- iso_date = dt.strftime("%Y-%m-%dT%H:%M:%S")
- except:
- iso_date = earliest_date
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "firstEarliestSlotDate": earliest_date,
- "action": "schedule",
- "ipAddress": self.real_ip or "127.0.0.1",
- "eadAppointmentDetail": iso_date
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
- return self.session.last_response.json().get("isSavedSuccess", False)
- # ----------------- 内部功能函数 -----------------
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None) -> bool:
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- print(f'[perform request] {method} {url}')
- # --- 1. 发送 OPTIONS 请求 ---
- try:
- # OPTIONS 请求使用相同的 URL 和 headers (部分 header 如 content-length 会被自动处理)
- # 某些服务器反爬会检测 OPTIONS 请求
- opt_headers = headers.copy() if headers else {}
-
- # 发送 OPTIONS
- self.session.request("OPTIONS", url, headers=opt_headers, timeout=10)
- # C++ 代码中并不检查 OPTIONS 的返回值,只检查执行是否成功
- # 这里我们假设只要不抛出异常即可
-
- except Exception as e:
- # 记录警告但不中断流程,防止仅仅是 OPTIONS 失败导致误判
- VSC_DEBUG("vfs_plg", "OPTIONS request failed (non-fatal): %s", str(e))
- # --- 2. 发送实际请求 ---
- try:
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
-
- self.session.last_response = resp
-
- if resp.status_code == 200:
- return True
- elif resp.status_code == 401:
- self._set_error(401, "Session Invalid (401)")
- elif resp.status_code == 403:
- self._set_error(403, "Blocked by Firewall (403)")
- elif resp.status_code == 429:
- self._set_error(429, "Rate Limited (429)")
- else:
- self._set_error(resp.status_code, f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-
- return False
- except Exception as e:
- self._set_error(9000, f"Network Exception: {str(e)}")
- return False
- def _encrypt_password(self, password: str) -> str:
- ciphertext = self.public_key.encrypt(
- password.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None
- )
- )
- return base64.b64encode(ciphertext).decode()
- def _get_orange_source(self, email: str) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"{email};{timestamp}"
- return self._encrypt_password(payload)
- def _get_client_source(self) -> str:
- timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"GA;{timestamp}Z"
- return self._encrypt_password(payload)
- def _handle_cloudflare_challenge(self) -> str:
- """
- 完整实现的 Cloudflare Turnstile 验证逻辑
- 对应 C++ VFSApi::handle_cloudflare_challenge
- """
- # 1. 准备参数
- mission = self.free_config.get("missionCode", "")
- country = self.free_config.get("countryCode", "")
- if not mission or not country:
- self._set_error(9001, "Missing missionCode or countryCode in free_config")
- return ""
- website_url = f"https://visa.vfsglobal.com/{country}/en/{mission}/login"
-
- # 构造代理字符串传给打码平台 (格式: http://user:pass@ip:port)
- proxy_str = ""
- if self.config.proxy.ip:
- proxy_str = f"{self.config.proxy.scheme}://"
- if self.config.proxy.username:
- proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@"
- proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}"
- # 2. 提交任务
- VSC_INFO("vfs_plg", "[%s] Submitting Turnstile task for %s...", self.group_id, website_url)
- task_out = VSCloudApi.Instance().submit_anti_turnstile_task(proxy_str, website_url)
- if not task_out:
- self._set_error(9002, "Failed to submit captcha task to Cloud API")
- return ""
-
- task_id = str(task_out.get("id"))
- if not task_id:
- self._set_error(9002, "Cloud API returned invalid task ID")
- return ""
- # 3. 轮询结果 (超时时间 120秒)
- timeout = 120
- start_time = time.time()
-
- while time.time() - start_time < timeout:
- result_out = VSCloudApi.Instance().get_anti_turnstile_result(task_id)
- if not result_out:
- # 获取结果的网络请求失败,稍后重试
- time.sleep(3)
- continue
-
- # status: 0=Pending, 1=Processing, 2=Success, 3=Failed
- status = result_out.get("status", 0)
-
- if status == 2: # Success
- raw_result = result_out.get("result", "")
- try:
- # 4. 解析结果 JSON
- # 打码平台返回的 result 通常是一个 JSON 字符串,包含 token, userAgent, cookies
- if isinstance(raw_result, str):
- data = json.loads(raw_result)
- else:
- data = raw_result # 已经是 dict
- token = data.get("token")
- ua = data.get("userAgent")
- cookies_list = data.get("cookies", [])
- if not token:
- self._set_error(9004, "Captcha solved but token is empty")
- return ""
- # 5. 同步环境 (User-Agent 和 Cookies)
- # 这是最关键的一步,必须使用通过验证时的环境进行后续请求
-
- # A. 设置 User-Agent
- if ua:
- self.user_agent = ua
- self.session.headers["User-Agent"] = ua
- # 注意:curl_cffi 的 impersonate 可能会覆盖 header,
- # 如果打码平台返回的 UA 看起来像 Chrome 124,通常兼容性没问题。
- # 如果非常严格,可能需要根据返回的 UA 调整 impersonate 参数,但在 VFS 场景下
- # 只要 header 对了通常通过率就很高。
- # B. 设置 Cookies
- # 这里的 cookies 是 Turnstile 验证过程中生成的 (如 cf_clearance)
- if cookies_list:
- VSC_DEBUG("vfs_plg", "[%s] Syncing %d cookies from Captcha solver...", self.group_id, len(cookies_list))
- for cookie in cookies_list:
- # 兼容不同的 cookie 格式
- c_name = cookie.get("name")
- c_value = cookie.get("value")
- c_domain = cookie.get("domain", "")
- c_path = cookie.get("path", "/")
-
- if c_name and c_value:
- self.session.cookies.set(
- name=c_name,
- value=c_value,
- domain=c_domain,
- path=c_path
- )
-
- VSC_INFO("vfs_plg", "[%s] Cloudflare challenge passed.", self.group_id)
- return token
- except Exception as e:
- self._set_error(9005, f"Failed to parse captcha result JSON: {str(e)}")
- return ""
- elif status == 3: # Failed
- err_msg = result_out.get("result", "Unknown error")
- self._set_error(9003, f"Captcha task failed: {err_msg}")
- return ""
-
- else:
- # Pending / Processing
- time.sleep(3)
-
- self._set_error(9003, "Captcha task timeout (120s)")
- return ""
- def _get_common_headers(self, with_auth=True) -> Dict[str, str]:
- mission = self.free_config.get("missionCode", "")
- country = self.free_config.get("countryCode", "")
- lang = self.free_config.get("language", "en")
- route = f"{country}/{lang}/{mission}"
-
- h = {
- "accept": "application/json, text/plain, */*",
- "accept-language": "en-US,en;q=0.9",
- "origin": "https://visa.vfsglobal.com",
- "referer": "https://visa.vfsglobal.com/",
- "route": route
- }
-
- client_src = self._get_client_source()
- h["clientsource"] = client_src
-
- if with_auth and self.jwt_token:
- h["authorize"] = self.jwt_token
-
- return h
- def _query_earliest_slot(self, apt_config, earliest_date_out: List[str]) -> bool:
- """
- 查询最早 Slot (对应 C++ VFSApi::query_earliest_slot_with_retry)
- 增加了 403 被拦截时的 Cloudflare 自动绕过机制
- """
- url = "https://lift-api.vfsglobal.com/appointment/CheckIsSlotAvailable"
-
- data = {
- "missioncode": self.free_config.get("missionCode"),
- "countrycode": self.free_config.get("countryCode"),
- "vacCode": apt_config.get("vacCode"),
- "visaCategoryCode": apt_config.get("subcategoryCode"),
- "roleName": "Individual",
- "loginUser": self.config.account.username,
- "payCode": ""
- }
-
- max_retries = 3
- for attempt in range(max_retries):
- # 每次重试前重新获取 header,因为 handle_cloudflare_challenge 可能会更新 Token/UA
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- # 发送请求
- if self._perform_request("POST", url, headers=headers, json_data=data):
- # --- 请求成功 (HTTP 200) ---
- resp_text = self.session.last_response.text
-
- # 1. 检查是否是 WaitList
- if "WaitList" in resp_text:
- earliest_date_out.append("WaitList")
- return True
-
- # 2. 解析日期
- try:
- j = self.session.last_response.json()
- if j.get("earliestSlotLists"):
- raw_date = j["earliestSlotLists"][0]["date"]
- # raw_date 示例: "09/10/2025 00:00:00" (表示 2025年9月10日)
-
- try:
- # 1. 按 MM/DD/YYYY HH:MM:SS 解析
- dt = datetime.strptime(raw_date, "%m/%d/%Y %H:%M:%S")
- # 2. 转为 DD/MM/YYYY
- std_date = dt.strftime("%d/%m/%Y")
- earliest_date_out.append(std_date)
- return True
- except ValueError:
- # 备用:万一格式变了 (比如变成了 YYYY-MM-DD),尝试其他解析或原样返回
- # 这里记录警告,防止 silently fail
- VSC_WARN("vfs_plg", "[%s] Date parse warning: '%s' not matching %%m/%%d/%%Y", self.group_id, raw_date)
- # 尝试直接分割,虽然可能格式不对,但总比崩溃好
- earliest_date_out.append(raw_date.split(" ")[0])
- return True
-
- except Exception as e:
- VSC_DEBUG("vfs_plg", f"Parse earliest slot error: {e}")
- pass
-
- # 虽然 HTTP 200 但没有 slot 数据
- self._set_error(2002, "No slots found in response")
- return False
- else:
- # --- 请求失败 (HTTP != 200) ---
- err = self.get_last_error()
-
- # 关键逻辑:如果被防火墙拦截 (403),尝试过盾
- if err.error_code == 403:
- VSC_WARN("vfs_plg", "[%s] Query Blocked (403) - Attempt %d/%d. Solving Cloudflare...",
- self.group_id, attempt + 1, max_retries)
-
- # 调用过盾逻辑
- cf_token = self._handle_cloudflare_challenge()
-
- if cf_token:
- # 过盾成功,随机冷却 1-3 秒后重试
- time.sleep(random.uniform(1, 3))
- continue
- else:
- # 过盾失败,无法继续
- self._set_error(403, "Cloudflare challenge failed during query retry")
- return False
-
- elif err.error_code == 401:
- # Session 失效,通常需要重新登录,这里不重试,直接返回失败让上层处理
- return False
-
- else:
- # 其他错误 (500, 404 等),不立即重试
- return False
-
- # 超过最大重试次数
- self._set_error(403, "Query blocked by firewall after max retries")
- return False
- def _set_error(self, code, msg):
- self.last_error = VSError(code, msg)
- VSC_ERROR("vfs_plg", "[%s] Error %d: %s", self.group_id, code, msg)
- def _fmt_date(self, yyyy_mm_dd):
- try:
- return datetime.strptime(yyyy_mm_dd, "%Y-%m-%d").strftime("%d/%m/%Y")
- except:
- return yyyy_mm_dd
- def _fetch_configurations(self, apt_config: Dict[str, Any]) -> bool:
- """
- 获取并缓存签证中心、类别、子类别配置
- 对应 C++ VFSApi::fetch_configurations
- """
- # 1. 获取所有中心配置 (query_center)
- if not self.center_conf:
- centers = []
- if not self._query_center(centers):
- return False
- self.center_conf = centers
- # 2. 获取 Visa Category 配置
- vac_code = apt_config.get("vacCode")
- category_code = apt_config.get("categoryCode")
-
- # 检查是否已缓存该 VAC 的 category 配置
- # C++ 逻辑是: _category_configuration[category_code] = vc
- # 但这里逻辑似乎是按 category_code 索引。为了保险,我们按 C++ 逻辑实现。
- # 注意:C++ map key 是 vac_code 还是 category_code?
- # C++ 代码中: if (!_category_configuration.contains(vac_code)...)
- # 但存进去是用 category_code 作为 key: _category_configuration[category_code] = vc;
- # 这在 Python 中有点奇怪,我们这里使用字典:self.category_conf[category_code] = config
-
- # 为了避免重复查询,我们需要知道是否已经查询过这个 VAC。
- # 简单起见,检查目标 category_code 是否已在缓存中
- if category_code not in self.category_conf:
- visa_categories = []
- if not self._query_visa_category(vac_code, visa_categories):
- return False
-
- found = False
- for vc in visa_categories:
- if vc.get("code") == category_code:
- self.category_conf[category_code] = vc
- found = True
- break
-
- # 如果没找到,可能配置错误,但 C++ 没报错,只继续
- if not found:
- VSC_WARN("vfs_plg", "[%s] Category code '%s' not found in VAC '%s'",
- self.group_id, category_code, vac_code)
- # 3. 获取 Visa SubCategory 配置
- sub_category_code = apt_config.get("subcategoryCode")
-
- if sub_category_code not in self.subcategory_conf:
- visa_subcategories = []
- if not self._query_visa_sub_category(vac_code, category_code, visa_subcategories):
- return False
-
- found = False
- for svc in visa_subcategories:
- if svc.get("code") == sub_category_code:
- self.subcategory_conf[sub_category_code] = svc
- found = True
- break
-
- if not found:
- VSC_WARN("vfs_plg", "[%s] SubCategory code '%s' not found",
- self.group_id, sub_category_code)
- return True
- def _query_center(self, centers_out: List) -> bool:
- """对应 C++ VFSApi::query_center"""
- mission = self.free_config.get("missionCode")
- country = self.free_config.get("countryCode")
- url = f"https://lift-api.vfsglobal.com/master/center/{mission}/{country}/en-US"
-
- headers = self._get_common_headers(with_auth=False) # 通常 Master API 不需要 Auth 或者是独立的
- # 实际上根据 C++ 代码,这里使用的是 _get_client_source 生成的 headers,且不需要 authorize token
- # 但 C++ 代码中也没有明确加 authorize header,除非 _jwt_token 不为空
- # 保险起见,如果有了 Token 就带上,没有就不带 (get_common_headers 默认逻辑)
-
- if not self._perform_request("GET", url, headers=headers):
- return False
-
- try:
- j = self.session.last_response.json()
- if isinstance(j, list):
- centers_out.extend(j)
- return True
- else:
- self._set_error(2003, "query_center response is not a list")
- except Exception as e:
- self._set_error(9001, f"query_center parse error: {e}")
- return False
- def _query_visa_category(self, center_code: str, visa_category_out: List) -> bool:
- """对应 C++ VFSApi::query_visa_category"""
- mission = self.free_config.get("missionCode")
- country = self.free_config.get("countryCode")
- # URL Encode
- enc_center = urllib.parse.quote(center_code)
-
- url = f"https://lift-api.vfsglobal.com/master/visacategory/{mission}/{country}/{enc_center}/en-US"
- headers = self._get_common_headers(with_auth=False)
-
- if not self._perform_request("GET", url, headers=headers):
- return False
-
- try:
- j = self.session.last_response.json()
- # C++ 增加了错误检查
- if isinstance(j, list):
- if j and "error" in j[0] and j[0]["error"]:
- self._set_error(2004, f"API Error in query_visa_category: {j[0]}")
- return False
- visa_category_out.extend(j)
- return True
- else:
- self._set_error(2003, "query_visa_category response is not a list")
- except Exception as e:
- self._set_error(9001, f"query_visa_category parse error: {e}")
- return False
- def _query_visa_sub_category(self, center_code: str, category_code: str, visa_sub_category_out: List) -> bool:
- """对应 C++ VFSApi::query_visa_sub_category"""
- mission = self.free_config.get("missionCode")
- country = self.free_config.get("countryCode")
-
- enc_center = urllib.parse.quote(center_code)
- enc_cat = urllib.parse.quote(category_code)
-
- url = f"https://lift-api.vfsglobal.com/master/subvisacategory/{mission}/{country}/{enc_center}/{enc_cat}/en-US"
- headers = self._get_common_headers(with_auth=False)
-
- if not self._perform_request("GET", url, headers=headers):
- return False
-
- try:
- j = self.session.last_response.json()
- if isinstance(j, list):
- if j and "error" in j[0] and j[0]["error"]:
- self._set_error(2004, f"API Error in query_visa_sub_category: {j[0]}")
- return False
- visa_sub_category_out.extend(j)
- return True
- else:
- self._set_error(2003, "query_visa_sub_category response is not a list")
- except Exception as e:
- self._set_error(9001, f"query_visa_sub_category parse error: {e}")
- return False
- def _read_otp_email(self) -> str:
- """
- 读取 OTP 邮件 (对应 C++ VFSApi::read_otp_code)
- """
- # 1. 定义 C++ 代码中的常量
- master_email = "visafly666@gmail.com"
- recipient = self.config.account.username
- # 注意:C++ 代码中 sender 是 "donotreply at vfshelpline.com"
- sender = "donotreply at vfshelpline.com"
- subject_keywords = "One Time Password"
- body_keywords = "OTP"
- # 2. 获取当前 UTC 时间并格式化 (对应 C++ std::put_time "%Y-%m-%d %H:%M:%S")
- # 用于告诉云端只查询这个时间点之后收到的邮件
- now_utc = datetime.utcnow()
- formatted_utc_time = now_utc.strftime("%Y-%m-%d %H:%M:%S")
- VSC_INFO("vfs_plg", "[%s] Waiting for OTP email sent after %s...", self.group_id, formatted_utc_time)
- # 3. 轮询查收 (C++ 逻辑通常由外部调度,Python 插件内部实现轮询更稳健)
- # 尝试 12 次,每次间隔 5 秒,共 60 秒
- for i in range(12):
- content_out = [] # 用于接收结果的容器
-
- # 对应 C++: expiry = 5 * 60 (300秒)
- content_out = VSCloudApi.Instance().fetch_mail_content(
- master_email,
- sender,
- recipient,
- subject_keywords,
- body_keywords,
- formatted_utc_time,
- 300
- )
- if content_out:
- # 4. 正则匹配 6位数字 (对应 C++ std::regex otp_pattern(R"(\b\d{6}\b)"))
- match = re.search(r'\b\d{6}\b', content_out)
- if match:
- otp = match.group(0)
- VSC_INFO("vfs_plg", "[%s] OTP code found: %s", self.group_id, otp)
- return otp
-
- # 未找到,等待重试
- time.sleep(5)
- if i % 2 == 0:
- VSC_DEBUG("vfs_plg", "[%s] OTP not found yet, retrying...", self.group_id)
- # 5. 超时处理
- self._set_error(4004, "OTP email not found (timeout)")
- return ""
- def _submit_login_otp(self, cf_token: str, otp: str) -> bool:
- """
- 提交 OTP 验证码进行登录 (对应 C++ VFSApi::submit_otp_code)
- """
- VSC_INFO("vfs_plg", "[%s] Submitting Login OTP...", self.group_id)
- # 1. 准备基础数据
- email = self.config.account.username
- password = self.config.account.password
- enc_password = self._encrypt_password(password)
-
- mission_code = self.free_config.get("missionCode", "")
- country_code = self.free_config.get("countryCode", "")
-
- # 2. 生成加密 Source (每次请求时间戳不同,建议重新生成)
- client_src = self._get_client_source()
- orange_src = self._get_orange_source(email)
-
- # 3. 构造请求
- url = "https://lift-api.vfsglobal.com/user/login"
- headers = self._get_common_headers(with_auth=False)
- headers.update({
- "clientsource": client_src,
- "orangex": orange_src,
- "content-type": "application/x-www-form-urlencoded"
- })
-
- # 注意:C++ 代码中在此处会再次调用 handle_cloudflare_challenge 获取新 token。
- # 如果传入的 cf_token 已经在上一步(密码登录)中被消耗,这里可能需要重新获取。
- # 为了稳健,如果传入为空,尝试重新获取。
- if not cf_token:
- VSC_DEBUG("vfs_plg", "[%s] CF Token is empty, regenerating for OTP...", self.group_id)
- cf_token = self._handle_cloudflare_challenge()
- if not cf_token:
- return False
- data = {
- "username": email,
- "password": enc_password,
- "missioncode": mission_code,
- "countrycode": country_code,
- "languageCode": "en-US",
- "captcha_version": "cloudflare-v1",
- "captcha_api_key": cf_token,
- "otp": otp # 关键字段:OTP 验证码
- }
-
- # 4. 发送请求 (POST form-urlencoded)
- if not self._perform_request("POST", url, headers=headers, data=data):
- return False
-
- # 5. 处理响应
- try:
- resp_json = self.session.last_response.json()
-
- # A. 检查错误
- if "error" in resp_json and resp_json["error"]:
- err_obj = resp_json["error"]
- # 兼容不同的错误结构 (message 或 description)
- msg = ""
- if isinstance(err_obj, dict):
- msg = err_obj.get("message") or err_obj.get("description") or "Unknown error"
- else:
- msg = str(err_obj)
-
- if "locked" in str(msg).lower():
- self._set_error(4005, f"Account Locked during OTP: {msg}")
- else:
- self._set_error(4006, f"OTP Login Error: {msg}")
- return False
- # B. 检查 AccessToken (登录成功)
- if "accessToken" in resp_json and resp_json["accessToken"]:
- self.jwt_token = resp_json["accessToken"]
- VSC_INFO("vfs_plg", "[%s] OTP Login successful, JWT obtained.", self.group_id)
- return True
-
- self._set_error(9006, f"OTP Login failed, unknown response: {str(resp_json)[:100]}")
- return False
-
- except Exception as e:
- self._set_error(9001, f"OTP Login parse error: {str(e)}")
- return False
-
- def _submit_no_addition_service(self, urn) -> bool:
- url = "https://lift-api.vfsglobal.com/vas/mapvas"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "loginUser": self.config.account.username,
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "urn": urn,
- "applicants": []
- }
- # C++ 只请求不检查结果,或者只要200就行
- return self._perform_request("POST", url, headers=headers, json_data=data)
- def _query_fee(self, apt_config, urn) -> Tuple[float, str]:
- url = "https://lift-api.vfsglobal.com/appointment/fees"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "languageCode": "en-US"
- }
-
- if self._perform_request("POST", url, headers=headers, json_data=data):
- try:
- j = self.session.last_response.json()
- amt = j.get("totalamount", 0.0)
- curr = ""
- if j.get("feeDetails"):
- curr = j["feeDetails"][0].get("currency", "")
- return float(amt), curr
- except:
- pass
- return 0.0, ""
- def _schedule(self, apt_config, urn, amount, currency, slot_id, res_out: Dict) -> bool:
- url = "https://lift-api.vfsglobal.com/appointment/schedule"
- headers = self._get_common_headers(with_auth=True)
- headers["content-type"] = "application/json;charset=UTF-8"
-
- data = {
- "missionCode": self.free_config.get("missionCode"),
- "countryCode": self.free_config.get("countryCode"),
- "centerCode": apt_config.get("vacCode"),
- "loginUser": self.config.account.username,
- "urn": urn,
- "notificationType": "none",
- "paymentdetails": {
- "paymentmode": "Online",
- "RequestRefNo": "",
- "clientId": "",
- "merchantId": "",
- "amount": amount,
- "currency": currency
- },
- "allocationId": slot_id,
- "CanVFSReachoutToApplicant": True
- }
-
- if not self._perform_request("POST", url, headers=headers, json_data=data):
- return False
-
- try:
- j = self.session.last_response.json()
- if j.get("IsAppointmentBooked"):
- res_out.update(j)
- return True
- except:
- pass
- return False
- def _pay_request(self, payload) -> str:
- # C++: 检查 301/302 Redirect Location
- url = f"https://online.vfsglobal.com/PG-Component/Payment/PayRequest?payLoad={payload}"
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "referer": "https://visa.vfsglobal.com/",
- "user-agent": self.user_agent
- }
-
- try:
- # allow_redirects=False 以捕获 Location header
- resp = self.session.get(url, headers=headers, allow_redirects=False)
- if resp.status_code in [301, 302]:
- loc = resp.headers.get("Location", "")
- if loc.startswith("http"):
- return loc
- else:
- return "https://online.vfsglobal.com" + loc if loc.startswith("/") else "https://online.vfsglobal.com/" + loc
- except:
- pass
- return ""
- def _save_http_session(self, page_url):
- """
- 提取 cookies, local_storage, 存入 VSCloudApi
- 修复:curl_cffi 没有 requests.utils.dict_from_cookiejar,需手动提取
- """
- cookies_dict = {}
- try:
- # 方式 1: curl_cffi 的 cookies 对象通常支持 get_dict()
- if hasattr(self.session.cookies, "get_dict"):
- cookies_dict = self.session.cookies.get_dict()
- else:
- # 方式 2: 迭代 (兼容标准 CookieJar)
- for c in self.session.cookies:
- cookies_dict[c.name] = c.value
- except Exception as e:
- VSC_WARN("vfs_plg", "[%s] Failed to extract cookies: %s", self.group_id, str(e))
- cookies_str = json.dumps(cookies_dict)
-
- # 简单生成 SessionID hash
- ua_str = self.user_agent or "unknown_ua"
- raw = cookies_str + ua_str + page_url
-
- try:
- session_id = hashes.Hash(hashes.SHA256(), backend=default_backend())
- session_id.update(raw.encode())
- sid = session_id.finalize().hex()
-
- proxy_str = ""
- if self.config.proxy.ip:
- proxy_str = f"{self.config.proxy.scheme}://"
- if self.config.proxy.username:
- proxy_str += f"{self.config.proxy.username}:{self.config.proxy.password}@"
- proxy_str += f"{self.config.proxy.ip}:{self.config.proxy.port}"
-
- saved_session = VSCloudApi.Instance().create_http_session(
- sid, cookies_str, "", ua_str, proxy_str, page_url
- )
- if saved_session:
- VSC_INFO("vfs_plg", "[%s] Session saved. ID: %s", self.group_id, sid)
- else:
- VSC_ERROR("vfs_plg", "[%s] Session save failed. ID: %s", self.group_id, sid)
- return saved_session
- except Exception as e:
- # 捕获异常,确保即使保存 Session 失败,也不影响主流程返回预订结果
- VSC_WARN("vfs_plg", "[%s] Failed to save session to cloud: %s", self.group_id, str(e))
- return None
|