import time import json import random import re import os from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin, urlparse from requests_toolbelt import MultipartEncoder from curl_cffi import requests, const from bs4 import BeautifulSoup from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi class TlsPlugin(IVSPlg): """ TLSContact 签证预约插件 适配法国签证 (FR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.session: Optional[requests.Session] = None self.travel_group: Optional[Dict] = None self.user_agent: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def health_check(self) -> bool: if not self.is_healthy: return False if self.session is None: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session") return False return True def create_session(self): """ 创建会话:处理 Cloudflare -> 登录 -> 获取 Travel Group """ # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: self.config.debug, } self.session = requests.Session( proxy=self._get_proxy_url(), impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) apt_config = self.free_config.get('apt_config', {}) if not apt_config: raise NotFoundError(message="apt_config not found in free config") # 2. 解决 Cloudflare 5s 盾 self._solve_cloudflare5S_challenge() # 3. 获取登录页面参数 (OIDC) login_page = "https://visas-fr.tlscontact.com/en-us/login" params = { "issuerId": apt_config["code"], "country": apt_config["country"], "vac": apt_config["code"], "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}" } headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': f'https://visas-fr.tlscontact.com/en-us/country/{apt_config["country"]}/vac/{apt_config["code"]}', 'User-Agent': self.user_agent, } resp = self._perform_request("GET", login_page, headers=headers, params=params) if self.config.debug: self._save_debug_html(resp.text, prefix='Tls_Login_Page') # 解析 Keycloak 登录地址 soup = BeautifulSoup(resp.text, 'html.parser') form = soup.find('form') if not form: raise NotFoundError(message="Login form not found") action = form.get('action') authenticate_url = action if action.startswith('http') else urljoin(resp.url, action) # 4. 解决 ReCaptcha V2 (登录验证码) api_token = self.free_config.get("capsolver_key", "") if not api_token: raise NotFoundError(message="Missing 'capsolver_key' in free_config, captcha might fail.") rc_params = { "type": "ReCaptchaV2TaskProxyLess", "page": authenticate_url, "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0", "apiToken": api_token, "proxy": self._get_proxy_url() } g_token = self._solve_recaptcha(rc_params) # 5. 提交登录 payload = { 'username': self.config.account.username, 'password': self.config.account.password, 'g-recaptcha-response': g_token } headers['Content-Type'] = 'application/x-www-form-urlencoded' resp = self._perform_request("POST", authenticate_url, headers=headers, data=payload) if self.config.debug: self._save_debug_html(resp.text, prefix='Tls_Travel_Groups_Page') # 6. 解析 Travel Groups self._check_page_is_session_expired_or_invalid("My travel group", resp.text) groups = self._parse_travel_groups(resp.text) # 选择匹配城市的 Group target_city = apt_config['city'].lower() for g in groups: if g['location'].lower() == target_city: self.travel_group = g break if not self.travel_group: raise NotFoundError(message=f"No matched group found for city {target_city}") self.session_create_time = time.time() self._log(f"Session created successfully. Group: {self.travel_group['group_number']}") def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False apt_config = self.free_config.get('apt_config', {}) group_num = self.travel_group['group_number'] interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) max_retries = self.free_config.get("max_retries", 2) url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' params = { 'location': apt_config["code"], 'month': interest_month, } headers = { 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'referer': f'{url}?location={apt_config["code"]}', 'user-agent': self.user_agent, } for attempt in range(1, max_retries + 1): try: resp = self._perform_request("GET", url, headers=headers, params=params) if self.config.debug: self._save_debug_html(resp.text, prefix='Tls_Query_Slot_Page') break # ✅ 请求成功,跳出重试循环 except PermissionDeniedError: self._log(f"Query Appointment-booking blocked (403), attempt {attempt}/{max_retries}") # 最后一次就不再绕盾了 if attempt >= max_retries: raise PermissionDeniedError() self._solve_cloudflare5S_challenge() self._log("Cloudflare bypass success, retrying...") continue self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text) # 3. 解析 Slots all_slots = self._parse_appointment_slots(resp.text) target_labels = self.free_config.get("target_labels", ["", "pta"]) available = [s for s in all_slots if s.get("label") in target_labels] if available: res.success = True earliest_date = available[0]["date"] earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d") res.availability_status = AvailabilityStatus.Available res.earliest_date = earliest_dt date_map: dict[datetime, list[TimeSlot]] = {} for s in available: date_str = s["date"] dt = datetime.strptime(date_str, "%Y-%m-%d") date_map.setdefault(dt, []).append( TimeSlot(time=s["time"], label=str(s.get("label", ""))) ) res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()] else: res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: res = VSBookResult() res.success = False # 1. 基础信息提取 apt_config = self.free_config.get('apt_config', {}) group_num = self.travel_group['group_number'] available_dates = [da.date for da in slot_info.availability] exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') support_pta = user_inputs.get('support_pta', True) target_labels = [''] if support_pta: target_labels.append('pta') available_dates_str = [ da.date.strftime("%Y-%m-%d") for da in slot_info.availability ] valid_dates = self._filter_dates(available_dates, exp_start, exp_end) if not valid_dates: raise NotFoundError(message="No dates match user constraints") selected_date = None selected_time = None selected_label = None # [关键修正] Label 处理 for d in valid_dates: for da in slot_info.availability: if da.date == d: for t in da.times: if t.label in target_labels: selected_date = d selected_time = t selected_label = t.label break # 2. 解决 ReCaptcha V3 # 动作必须是 "book" page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={apt_config["code"]}&month={selected_date[:7]}' api_token = self.free_config.get("capsolver_key", "") rc_params = { "type": "ReCaptchaV3Task", "page": page_url, "action": "book", "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9", "apiToken": api_token, "proxy": self._get_proxy_url() } g_token = self._solve_recaptcha(rc_params) # 3. 构造 Payload (严格对齐你的 Curl Dump) # Next.js Server Action ID (从你的 header 确认) ACTION_ID = "60d0616946df1fc4e7c094ca6a7a04f134d0be3d53" fields = { '1_formGroupId': str(group_num), # 修正:加了 form 前缀 '1_lang': 'en-us', '1_process': 'APPOINTMENT', '1_location': apt_config["code"], # 例如 gbLON2fr '1_date': selected_date, '1_time': selected_time, '1_appointmentLabel': selected_label, # 修正:单数 Label,值为字符串 "pta" 或 "regular" '1_captcha_token': g_token, # 修正:下划线格式 '0': '[{"status":"IDLE"},"$K1"]' # 对应 Next.js Action 的状态位 } m = MultipartEncoder(fields=fields) # 4. 发送请求 url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' headers = { 'Next-Action': ACTION_ID, 'Referer': page_url, 'Origin': 'https://visas-fr.tlscontact.com', 'Accept': 'text/x-component', 'User-Agent': self.user_agent, # 确保和 curl_cffi 的 impersonate 一致 'Content-Type': m.content_type, # 使用你 dump 里的 State Tree,虽然长,但最稳妥 'Next-Router-State-Tree': '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22'+str(group_num)+'%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D' } # 必须使用 curl_cffi 模拟浏览器指纹 resp = self.session.post(url, data=m.to_string(), headers=headers, allow_redirects=False) if self.config.debug: self._save_debug_html(resp.text, prefix='Tls_Book_Result') # 5. 结果判定 if resp.status_code == 303: location = resp.headers.get('Location', '') self._log(f"Booking Success! Redirecting to: {location}") res.success = True res.book_date = selected_date res.book_time = selected_time return res elif resp.status_code == 200: # Next.js 有时会在 200 中返回业务错误 if "APPOINTMENT_LIMIT_REACHED" in resp.text: self._log("Failed: 限制/无号") elif "Invalid captcha" in resp.text: self._log("Failed: 验证码错误") else: self._log(f"Booking Failed (200 OK but error content): {resp.text[:200]}") else: self._log(f'Booking Failed. Status: {resp.status_code}') return res def _log(self, message): if self.logger: self.logger(f'[TlsPlugin] [{self.group_id}] {message}') def _save_debug_html(self, content: str, prefix: str = "debug"): save_dir = "debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) self._log(f"HTML saved to: {filename}") def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" return proxy_url def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30) if self.config.debug: self._log(f'[perform request] Response={resp.text}\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}') if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _solve_cloudflare5S_challenge(self): """ 解决 Cloudflare 5s 盾 """ self._log(f"Solving Cloudflare 5s...") apt_config = self.free_config.get('apt_config', {}) website_url = f'https://visas-fr.tlscontact.com/en-us/country/{apt_config["country"]}' # 1. 格式化代理字符串, 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例) p = self.config.proxy if p.username: proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}" else: proxy_str = f"{p.ip}:{p.port}" # 2. 提交任务 task_id = VSCloudApi.Instance().create_task( command="AntiCloudflareTask", args={ "proxy": proxy_str, "websiteUrl": website_url } ) result_data = VSCloudApi.Instance().get_task_result(task_id, timeout=60) task_result = result_data.get("result", {}) cookies_list = task_result.get('cookies', []) for cookie in cookies_list: if cookie['name'] in ['__cf_bm', 'cf_clearance']: self.session.cookies.set( cookie['name'], cookie['value'], domain=cookie['domain'], path='/' ) ua = task_result.get('userAgent') if ua: self.user_agent = ua self.session.headers['User-Agent'] = ua self._log("Cloudflare 5s challenge solved.") def _solve_recaptcha(self, params) -> str: """ 调用 Capsolver """ key = params.get("apiToken") if not key: raise NotFoundError(message="Api-token is required for recaptcha solver") submit_url = "https://api.capsolver.com/createTask" task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey"), } if params.get("action"): task["pageAction"] = params.get("action") if params.get("proxy"): p = urlparse(params.get("proxy")) task["proxyType"] = p.scheme task["proxyAddress"] = p.hostname task["proxyPort"] = p.port if p.username: task["proxyLogin"] = p.username task["proxyPassword"] = p.password payload = {"clientKey": key, "task": task} r = requests.post(submit_url, json=payload, timeout=20) if r.status_code != 200: raise BizLogicError(message="Failed to submit capsolver task") task_id = r.json().get("taskId") for _ in range(20): r = requests.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20) if r.status_code == 200: d = r.json() if d.get("status") == "ready": return d["solution"]["gRecaptchaResponse"] time.sleep(3) raise BizLogicError(message="Capsolver task timeout") def _parse_travel_groups(self, html: str) -> List[Dict]: groups = [] js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html, re.DOTALL) if js_match: json_str = js_match.group(1).replace(r'\"', '"') data = json.loads(json_str) for g in data: groups.append({ 'group_name': g.get('groupName'), 'group_number': g.get('formGroupId'), 'location': g.get('vacName') }) else: self._log('Parsed travel group page, but not found travelGroups') return groups def _parse_appointment_slots(self, html: str) -> List[Dict]: slots = [] pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment' match = re.search(pattern, html, re.DOTALL) if match: json_str = match.group(1).replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) lbl = "" stype = "" cost = "" if 'pta' in labels: lbl = 'pta' stype = "Prime" elif 'ptaw' in labels: lbl = 'ptaw' stype = "Prime Weekend" elif '' in labels: lbl = '' stype = "Standard" if lbl or not labels: slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl, 'type': stype, 'cost': cost }) return slots else: self._log('Parsed appointment slot page, but not found availableAppointments') return slots def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool: if not html: self.is_healthy = False raise SessionExpiredOrInvalidError() if keyword not in html: if 'redirected automatically' in html.lower(): self.is_healthy = False raise SessionExpiredOrInvalidError() if 'login' in html.lower() and 'password' in html.lower(): self.is_healthy = False raise SessionExpiredOrInvalidError() if 'session expired!' in html.lower() and 'for security reasons, your session has expired. please log in again to continue.' in html.lower() and 'you will be redirected automatically in 10 seconds.' in html.lower(): self.is_healthy = False raise SessionExpiredOrInvalidError() if 'temporarily blocked!' in html.lower() and 'Your session has been temporarily suspended due to the high number of your access to this page.' in html.lower() and 'You can try to access your account again in 2 hours.' in html.lower(): self.is_healthy = False raise SessionExpiredOrInvalidError() def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: """ 根据用户的期望范围筛选可用日期 :param dates: API 返回的可用日期列表 (YYYY-MM-DD) :param start_str: 用户期望开始日期 (YYYY-MM-DD) :param end_str: 用户期望结束日期 (YYYY-MM-DD) :return: 符合要求的日期列表 """ # 如果没有设置范围,则不过滤,返回所有日期 if not start_str or not end_str: return dates valid_dates = [] # 截取前10位以防带有时分秒 s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") # 比较范围 (闭区间) if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates