import time import json import random import re import os from datetime import datetime from typing import List, Dict, Optional, Any from urllib.parse import urljoin, urlparse # 第三方库 try: from curl_cffi import requests, const from bs4 import BeautifulSoup except ImportError: raise ImportError("Missing dependencies. Run: pip install curl-cffi beautifulsoup4") # 框架依赖 from vs_plg import IVSPlg, VSError # type: ignore from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, QueryWaitMode # type: ignore from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore from toolkit.vs_cloud_api import VSCloudApi # type: ignore class TlsPlugin(IVSPlg): """ TLS 签证预约插件 适配法国签证 (FR) 流程 """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} # 会话相关 self.session: Optional[requests.Session] = None self.travel_group: Optional[Dict] = None self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36" # 状态 self.last_error = VSError(0, "OK") self.is_healthy = True def _save_debug_html(self, content: str, prefix: str = "debug"): """ 辅助方法:将页面 HTML 保存到本地 debug_pages 目录 """ try: # 确保目录存在 save_dir = "debug_pages" if not os.path.exists(save_dir): os.makedirs(save_dir) # 生成文件名: prefix_GroupID_时间戳.html timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{self.group_id}_{timestamp}.html" with open(filename, "w", encoding="utf-8") as f: f.write(content) VSC_INFO("tls_plg", "[%s] HTML saved to: %s", self.group_id, filename) except Exception as e: VSC_WARN("tls_plg", "[%s] Failed to save debug HTML: %s", self.group_id, str(e)) def get_group_id(self) -> str: return self.group_id def set_config(self, config: VSPlgConfig): self.config = config try: self.free_config = json.loads(config.free_config) if config.free_config else {} except: self.free_config = {} def health_check(self) -> bool: return self.is_healthy def get_last_error(self) -> VSError: return self.last_error def _set_error(self, code: int, message: str): self.last_error = VSError(code, message) VSC_ERROR("tls_plg", "[%s] Error %d: %s", self.group_id, code, message) if code in [2003, 2000, 2001]: # 会话无效或登录失败 self.is_healthy = False # --------------------------------------------------------- # 核心接口实现 # --------------------------------------------------------- def create_session(self) -> bool: """ 创建会话:处理 Cloudflare -> 登录 -> 获取 Travel Group """ VSC_INFO("tls_plg", "[%s] Creating session...", self.group_id) self.is_healthy = True # 1. 初始化 Session curlopt = { const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.MAXLIFETIME_CONN: 1800, const.CurlOpt.VERBOSE: False, # 生产环境建议关闭 } # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" self.session = requests.Session( proxy=proxy_url, impersonate="chrome124", curl_options=curlopt, use_thread_local_curl=False, http_version=const.CurlHttpVersion.V2TLS ) embassy = self._get_embassy_config() if not embassy: return False # 2. 解决 Cloudflare 5s 盾 if not self._solve_cloudflare5S_challenge(embassy, proxy_url): self._set_error(1001, "Cloudflare challenge failed") return False # 3. 获取登录页面参数 (OIDC) login_page = "https://visas-fr.tlscontact.com/en-us/login" params = { "issuerId": embassy["code"], "country": embassy["country"], "vac": embassy["code"], "redirect": f"/en-us/country/{embassy['country']}/vac/{embassy['code']}" } headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}/vac/{embassy["code"]}', 'User-Agent': self.user_agent, } try: resp = self.session.get(login_page, headers=headers, params=params) if resp.status_code != 200: self._set_error(resp.status_code, f"Get Login Page Failed: {resp.status_code}") return False # 解析 Keycloak 登录地址 soup = BeautifulSoup(resp.text, 'html.parser') form = soup.find('form') if not form: self._set_error(2005, "Login form not found") return False action = form.get('action') authenticate_url = action if action.startswith('http') else urljoin(resp.url, action) except Exception as e: self._set_error(1099, f"Network error during login init: {e}") return False # 4. 解决 ReCaptcha V2 (登录验证码) # 注意:这里需要 API Token,从配置获取 api_token = self.free_config.get("capsolver_key", "") if not api_token: VSC_WARN("tls_plg", "Missing 'capsolver_key' in free_config, captcha might fail.") rc_params = { "type": "ReCaptchaV2TaskProxyLess", # 或 ReCaptchaV2Task 配合 proxy "page": resp.url, "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0", "apiToken": api_token, "proxy": proxy_url } g_token = self._solve_recaptcha(rc_params) if not g_token: self._set_error(1001, "Failed to solve Login Recaptcha") return False # 5. 提交登录 payload = { 'username': self.config.account.username, 'password': self.config.account.password, 'g-recaptcha-response': g_token } headers['Content-Type'] = 'application/x-www-form-urlencoded' try: resp = self.session.post(authenticate_url, headers=headers, data=payload) if resp.status_code != 200: self._set_error(resp.status_code, f"Login Submit Failed: {resp.status_code}") return False # 6. 解析 Travel Groups groups = self._parse_travel_groups(resp.text) if not groups: # 检查是否包含错误信息 if "Invalid username or password" in resp.text: self._set_error(2000, "Invalid username or password") else: self._set_error(2005, "No Travel Groups found after login") return False # 选择匹配城市的 Group target_city = embassy['city'].lower() for g in groups: if g['location'].lower() == target_city: self.travel_group = g break if not self.travel_group: self._set_error(2005, f"No group found for city {target_city}") return False VSC_INFO("tls_plg", "[%s] Session created. Group: %s", self.group_id, self.travel_group['group_number']) return True except Exception as e: self._set_error(1099, f"Login exception: {e}") return False def query(self) -> VSQueryResult: res = VSQueryResult() if not self.session or not self.travel_group: self._set_error(2003, "Session invalid, please login first") return res embassy = self._get_embassy_config() group_num = self.travel_group['group_number'] interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' params = { 'location': embassy["code"], 'month': interest_month, } headers = { 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'referer': f'{url}?location={embassy["code"]}', 'user-agent': self.user_agent, } try: resp = self.session.get(url, params=params, headers=headers) # 1. 检查 Cloudflare 403 (硬性拦截) if resp.status_code == 403: VSC_WARN("tls_plg", "[%s] Query 403 Forbidden. Solving Cloudflare...", self.group_id) if self._solve_cloudflare5S_challenge(embassy): resp = self.session.get(url, params=params, headers=headers) else: self._set_error(2006, "Cloudflare re-challenge failed") return res # 2. 智能检查 Session Expired # 逻辑修正:即使是 401,如果内容包含 valid data,也视为成功 (绕过某些WAF误报) is_valid_content = "availableAppointments" in resp.text if not is_valid_content: if resp.status_code == 401 or self._is_session_expired_page(resp.text): VSC_WARN("tls_plg", "[%s] Session expired. URL: %s", self.group_id, resp.url) self._save_debug_html(resp.text, "query_session_expired") self._set_error(2003, "Session expired") self.is_healthy = False return res # 其他非 200 且无内容的错误 if resp.status_code != 200: self._set_error(resp.status_code, f"Query failed status: {resp.status_code}") return res # 3. 解析 Slots all_slots = self._parse_appointment_slots(resp.text) # 过滤 Label target_labels = self.free_config.get("target_labels", ["", "pta"]) available = [] for slot in all_slots: if slot.get('label') in target_labels: available.append(slot) res.success = True res.city = embassy['city'] res.visa_type = "Tourist" res.availability_status = AvailabilityStatus.NoneAvailable if available: res.availability_status = AvailabilityStatus.Available res.earliest_date = available[0]['date'] date_map = {} for s in available: d = s['date'] if d not in date_map: date_map[d] = [] ts = VSQueryResult.DateAvailability.TimeSlot() ts.time = s['time'] ts.label = f"{s['type']}" date_map[d].append(ts) for d, slots in date_map.items(): da = VSQueryResult.DateAvailability() da.date = d da.times = slots res.availability.append(da) VSC_INFO("tls_plg", "[%s] Found %d slots", self.group_id, len(available)) else: VSC_DEBUG("tls_plg", "[%s] Query OK, but no matching slots.", self.group_id) except Exception as e: self._set_error(1099, f"Query exception: {e}") return res def book(self, slot_info: VSQueryResult) -> VSBookResult: """ 预约 (实现 Multipart Form 提交) 注意:传入的 slot_info 是 query 的结果,我们需要从中选一个具体的 slot。 这里假设 slot_info.availability[0].times[0] 是我们要订的。 """ res = VSBookResult() if not self.session or not self.travel_group: self._set_error(2003, "Session invalid") return res # 简单策略:选第一个可用时间 if not slot_info.availability or not slot_info.availability[0].times: self._set_error(3002, "No slots in slot_info to book") return res target_date = slot_info.availability[0].date target_time = slot_info.availability[0].times[0].time # 从 label 解析回原始 label string 比较困难,这里简化处理, # 实际应在 QueryResult 中携带原始数据,或重新匹配 # 这里为了演示,假设 label 为空 (Standard) target_label = "" embassy = self._get_embassy_config() group_num = self.travel_group['group_number'] interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) # 1. 解决 ReCaptcha V3 page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={embassy["code"]}&month={interest_month}' proxy_url = self.session.proxies.get("http") if self.session.proxies else "" api_token = self.free_config.get("capsolver_key", "") rc_params = { "type": "ReCaptchaV3Task", "page": page_url, "action": "book", "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9", "apiToken": api_token, "proxy": proxy_url } g_token = self._solve_recaptcha(rc_params) if not g_token: self._set_error(1001, "Failed to solve Booking Recaptcha") return res # 2. 构造请求 url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' # 复杂的 Header next_action = '601f284bf7ee33b6578ad0fad426fae18c232707f2' # 此值可能会变,需关注 next_state = '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22$GROUPID$%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D' headers = { 'Next-Action': next_action, 'Referer': page_url, 'Next-Router-State-Tree': next_state.replace("$GROUPID$", group_num), 'Accept': 'text/x-component', 'User-Agent': self.user_agent, } params = { 'location': embassy["code"], 'month': interest_month, } # 3. 构造 Multipart Form Data boundary = "----WebKitFormBoundary" + "".join( random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", k=16) ) headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" form_fields = { '1_formGroupId': str(group_num), '1_lang': 'en-us', '1_process': 'APPOINTMENT', '1_location': embassy["code"], '1_date': target_date, '1_time': target_time, '1_appointmentLabel': target_label, '1_captcha_token': g_token, '0': '[{"status":"IDLE"},"$K1"]' } body_parts = [] for name, value in form_fields.items(): body_parts.append(f"--{boundary}\r\n") body_parts.append(f'Content-Disposition: form-data; name="{name}"\r\n') body_parts.append("\r\n") body_parts.append(f"{value}\r\n") body_parts.append(f"--{boundary}--\r\n") body = "".join(body_parts).encode("utf-8") try: resp = self.session.post(url, params=params, headers=headers, data=body) if resp.status_code == 303: # TLS 成功通常重定向 res.success = True res.order_id = f"TLS-{int(time.time())}" res.book_date = target_date res.book_time = target_time VSC_INFO("tls_plg", "[%s] Book Success (303 Redirect)!", self.group_id) return res else: self._set_error(resp.status_code, f"Book Failed: {resp.status_code} {resp.text[:100]}") except Exception as e: self._set_error(1099, f"Book exception: {e}") return res # --------------------------------------------------------- # 辅助功能 # --------------------------------------------------------- def _get_embassy_config(self) -> Dict: # 从 free_config 提取 embassy 信息,格式需与 TLS_EMBASSY 结构一致 # 示例 JSON: { "embassy": { "code": "gbLON2fr", "country": "gb", "mission": "fr", "city": "london" } } # 或者平铺在 free_config if "embassy_code" in self.free_config: return { "code": self.free_config.get("embassy_code"), "country": self.free_config.get("country_code"), "mission": self.free_config.get("mission_code", "fr"), "city": self.free_config.get("city") } return {} # 失败 def _solve_cloudflare5S_challenge(self, embassy, proxy_url) -> bool: """ 解决 Cloudflare 5s 盾 使用 VSCloudApi 的 submit_anticloudflare_task """ VSC_INFO("tls_plg", "[%s] Solving Cloudflare 5s...", self.group_id) website_url = f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}' # 1. 格式化代理字符串 # 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例) # self.config.proxy 结构体里的数据 p = self.config.proxy if not p.ip: VSC_ERROR("tls_plg", "Proxy is required for Cloudflare challenge") return False # 构造 user:pass@ip:port 用于 urlparse (方便解析) 或者直接拼接 # 你的独立脚本中是: f'{parsed_proxy.hostname}:{parsed_proxy.port}:{parsed_proxy.username}:{parsed_proxy.password}' # VSPlgConfig 中的 proxy 对象字段: ip, port, username, password if p.username: proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}" else: proxy_str = f"{p.ip}:{p.port}" # 2. 提交任务 task = VSCloudApi.Instance().submit_anticloudflare_task(proxy_str, website_url) if not task or not task.get('id'): VSC_ERROR("tls_plg", "[%s] Failed to submit AntiCloudflareTask", self.group_id) return False # 3. 等待结果 (VSCloudApi.get_anticloudflare_result 内部已包含轮询) task_id = str(task['id']) result = VSCloudApi.Instance().get_anticloudflare_result(task_id) if result: try: # 4. 解析结果并设置 Session # result['result'] 是一个 JSON 字符串,包含 cookies 和 userAgent parsed_result = json.loads(result.get('result', '{}')) cookies_list = parsed_result.get('cookies', []) name_list = ['__cf_bm', 'cf_clearance'] for cookie in cookies_list: if cookie['name'] in name_list: self.session.cookies.set( cookie['name'], cookie['value'], domain=cookie['domain'], path='/' ) ua = parsed_result.get('userAgent') if ua: self.user_agent = ua self.session.headers['User-Agent'] = ua VSC_INFO("tls_plg", "[%s] Cloudflare 5s challenge solved.", self.group_id) return True except Exception as e: VSC_ERROR("tls_plg", f"Failed to parse Cloudflare result: {e}") return False def _solve_recaptcha(self, params) -> Optional[str]: """ 调用 Capsolver (保留原脚本逻辑) """ try: key = params.get("apiToken") if not key: return None submit_url = "https://api.capsolver.com/createTask" task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey"), } if params.get("action"): task["pageAction"] = params.get("action") if params.get("proxy"): p = urlparse(params.get("proxy")) task["proxyType"] = p.scheme task["proxyAddress"] = p.hostname task["proxyPort"] = p.port if p.username: task["proxyLogin"] = p.username task["proxyPassword"] = p.password payload = {"clientKey": key, "task": task} r = requests.post(submit_url, json=payload, timeout=20) if r.status_code != 200: return None task_id = r.json().get("taskId") if not task_id: return None # Query for _ in range(20): r = requests.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20) if r.status_code == 200: d = r.json() if d.get("status") == "ready": return d["solution"]["gRecaptchaResponse"] time.sleep(3) except Exception as e: VSC_ERROR("tls_plg", f"Capsolver error: {e}") return None def _parse_travel_groups(self, html: str) -> List[Dict]: groups = [] try: js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html, re.DOTALL) if js_match: json_str = js_match.group(1).replace(r'\"', '"') data = json.loads(json_str) for g in data: groups.append({ 'group_name': g.get('groupName'), 'group_number': g.get('formGroupId'), 'location': g.get('vacName') }) except: pass return groups def _parse_appointment_slots(self, html: str) -> List[Dict]: slots = [] try: # 增强正则:匹配 "availableAppointments": 或 \"availableAppointments\": # 并且兼容末尾是 ,"showFlexi... 或 ,\"showFlexi... # DOTALL 模式确保匹配跨行 pattern = r'availableAppointments\\?":\s*(\[.*?\])(?:,\\?"|\},)' match = re.search(pattern, html, re.DOTALL) if match: json_str = match.group(1) # 清理转义字符:将 \" 替换为 " json_str = json_str.replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) lbl = "" stype = "" cost = "" if 'pta' in labels: lbl = 'pta' stype = "Prime" elif 'ptaw' in labels: lbl = 'ptaw' stype = "Prime Weekend" elif '' in labels: lbl = '' stype = "Standard" if lbl or not labels: slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl, 'type': stype, 'cost': cost }) except Exception as e: VSC_DEBUG("tls_plg", f"Slot parse error: {e}") pass return slots def _is_session_expired_page(self, html: str) -> bool: if not html: return False if 'availableAppointments' not in html: return True # 简化判断:如果包含 redirecting automatically 通常是过期 if 'redirected automatically' in html.lower(): return True return False