import time import json import random import re import os from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin, urlparse, urlencode # DrissionPage 核心 from DrissionPage import ChromiumPage, ChromiumOptions from vs_plg import IVSPlg from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.cloudflare_bypass_for_scraping import CloudflareBypasser from toolkit.vs_cloud_api import VSCloudApi # --- 辅助函数:创建代理插件 --- def create_proxy_auth_extension(ip, port, username, password, plugin_path="./chrome_proxy_auth_plugin"): if not os.path.exists(plugin_path): os.makedirs(plugin_path) manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Chrome Proxy Auth Extension", "permissions": ["proxy", "tabs", "unlimitedStorage", "storage", "", "webRequest", "webRequestBlocking"], "background": {"scripts": ["background.js"]}, "minimum_chrome_version": "22.0.0" } """ background_js = f""" var config = {{ mode: "fixed_servers", rules: {{ singleProxy: {{scheme: "http", host: "{ip}", port: parseInt({port})}}, bypassList: ["localhost"] }} }}; chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}}); function callbackFn(details) {{ return {{authCredentials: {{username: "{username}", password: "{password}"}}}}; }} chrome.webRequest.onAuthRequired.addListener( callbackFn, {{urls: [""]}}, ['blocking'] ); """ with open(os.path.join(plugin_path, "manifest.json"), "w") as f: f.write(manifest_json) with open(os.path.join(plugin_path, "background.js"), "w") as f: f.write(background_js) return os.path.abspath(plugin_path) class BrowserResponse: """模拟 requests.Response""" def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json class TlsPlugin2(IVSPlg): """ TLSContact 签证预约插件 (DrissionPage 版) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None # 浏览器实例 self.page: Optional[ChromiumPage] = None self.travel_group: Optional[Dict] = None self.session_create_time: float = 0 self.real_ip: str = "0.0.0.0" def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def _log(self, message): if self.logger: self.logger(f'[TlsPlugin] [{self.group_id}] {message}') else: print(f'[TlsPlugin] [{self.group_id}] {message}') def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def health_check(self) -> bool: if not self.is_healthy: return False if self.page is None: return False try: if not self.page.run_js("return 1;"): return False except: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session expired.") return False return True def create_session(self): """ 全浏览器会话创建:过盾 -> JS注入登录 -> 原生跳转 """ self._log("Initializing Browser Session (Full Browser Mode)...") co = ChromiumOptions() co.auto_port() if self.config.proxy and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: self._log(f"Proxy: {p.ip}:{p.port} (Auth)") co.add_extension(create_proxy_auth_extension(p.ip, p.port, p.username, p.password)) else: co.set_proxy(f"{p.scheme}://{p.ip}:{p.port}") co.headless(False) co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--disable-blink-features=AutomationControlled') try: self.page = ChromiumPage(co) embassy = self.free_config.get('center', {}) if not embassy: raise NotFoundError("center config missing") login_url = "https://visas-fr.tlscontact.com/en-us/login" params = { "issuerId": embassy["code"], "country": embassy["country"], "vac": embassy["code"], "redirect": f"/en-us/country/{embassy['country']}/vac/{embassy['code']}" } full_login_url = f"{login_url}?{urlencode(params)}" self._log(f"Navigating: {full_login_url}") self.page.get(full_login_url) # --- Cloudflare 过盾 --- cf = CloudflareBypasser(self.page, log=self.config.debug) if not cf.bypass(max_retry=15): raise BizLogicError("Cloudflare bypass timeout") # --- 登录页面检查 --- if not self.page.ele('#email-input-field'): self._log("Reloading Login Page...") self.page.get(full_login_url) if not self.page.wait.ele_displayed('#email-input-field', timeout=15): raise BizLogicError("Login form not loaded") # --- JS 注入登录 --- g_token = "" if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): self._log("Solving ReCaptcha...") rc_params = { "type": "ReCaptchaV2TaskProxyLess", "page": self.page.url, "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0", "apiToken": self.free_config.get("capsolver_key", "") } g_token = self._solve_recaptcha(rc_params) username = self.config.account.username password = self.config.account.password # 使用 JS 直接操作 DOM 并 click,让浏览器处理 302 js_login = f""" var u = document.getElementById('email-input-field'); if(u) {{ u.value = "{username}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }} var p = document.getElementById('password-input-field'); if(p) {{ p.value = "{password}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }} var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = "{g_token}"; }} var btn = document.getElementById('btn-login'); if(btn) {{ btn.click(); return true; }} else {{ return false; }} """ self._log("Submitting Login via JS...") if not self.page.run_js(js_login): raise BizLogicError("Login button missing") # --- 等待跳转 --- self._log("Waiting for redirect...") self.page.wait.url_change('login-actions', exclude=True, timeout=45) # 检查是否失败 if "login-actions" in self.page.url or "auth" in self.page.url: err = "Unknown Login Error" if "Invalid username" in self.page.html: err = "Invalid Credentials" raise BizLogicError(f"Login Failed: {err}") # --- 提取 Dashboard 信息 --- self._log("Waiting for dashboard...") self.page.wait.load_start() time.sleep(5) html = self.page.html self._check_page_is_session_expired_or_invalid("My travel group", html) groups = self._parse_travel_groups(html) target_city = embassy['city'].lower() for g in groups: if g['location'].lower() == target_city: self.travel_group = g break if not self.travel_group: raise NotFoundError(f"Group not found for {target_city}") self.session_create_time = time.time() self.real_ip = self._get_realnetwork_ip() self._log(f"Session Ready. Group: {self.travel_group['group_number']}") except Exception as e: self._log(f"Session Create Error: {e}") if self.page: self.page.quit(); self.page = None raise e def query(self) -> VSQueryResult: res = VSQueryResult() res.success = False embassy = self.free_config.get('center', {}) group_num = self.travel_group['group_number'] interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' params = { 'location': embassy["code"], 'month': interest_month, } # DrissionPage 自动处理 Cloudflare,直接 fetch 即可 try: resp = self._perform_request("GET", url, params=params, retry_count=1) except Exception as e: self._log(f"Query request failed: {e}") raise e self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text) # 解析 Slots all_slots = self._parse_appointment_slots(resp.text) target_labels = self.free_config.get("target_labels", ["", "pta"]) # 根据配置过滤 available = [s for s in all_slots if s.get("label") in target_labels] res.city = self.free_config.get('city', '') res.country = self.free_config.get('country', '') res.visa_type = self.free_config.get('visa_type', '') res.routing_key = self.free_config.get('routing_key', '') if available: res.success = True res.availability_status = AvailabilityStatus.Available res.earliest_date = available[0]["date"] date_map: dict[str, list[TimeSlot]] = {} for s in available: d = s["date"] date_map.setdefault(d, []).append( TimeSlot(time=s["time"], label=str(s.get("label", ""))) ) res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()] else: res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: res = VSBookResult() res.success = False embassy = self.free_config.get('center', {}) group_num = self.travel_group['group_number'] available_dates = [da.date for da in slot_info.availability] exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') support_pta = user_inputs.get('support_pta', True) target_labels = [''] if support_pta: target_labels.append('pta') valid_dates = self._filter_dates(available_dates, exp_start, exp_end) if not valid_dates: raise NotFoundError(message="No dates match user constraints") selected_date = None selected_time = None selected_label = None for d in valid_dates: for da in slot_info.availability: if da.date == d: for t in da.times: if t.label in target_labels: selected_date = d selected_time = t selected_label = t.label break if selected_date: break if not selected_date: raise NotFoundError(message="No suitable slot found") # 2. 解决 ReCaptcha V3 (Action: book) page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={embassy["code"]}&month={selected_date[:7]}' api_token = self.free_config.get("capsolver_key", "") rc_params = { "type": "ReCaptchaV3Task", "page": page_url, "action": "book", "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9", "apiToken": api_token, # "proxy": self._get_proxy_url() # ProxyLess } g_token = self._solve_recaptcha(rc_params) # 3. 构造 Next.js Payload # 注意:在 JS 中构造 FormData 比在 Python 中拼 Multipart 更容易且不易出错 ACTION_ID = "60d0616946df1fc4e7c094ca6a7a04f134d0be3d53" url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' # State Tree 字符串 router_state = '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22'+str(group_num)+'%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D' # 构造 JS 代码执行 fetch # 使用 FormData 对象来处理 multipart js_script = f""" const url = "{url}"; const formData = new FormData(); formData.append('1_formGroupId', '{group_num}'); formData.append('1_lang', 'en-us'); formData.append('1_process', 'APPOINTMENT'); formData.append('1_location', '{embassy["code"]}'); formData.append('1_date', '{selected_date}'); formData.append('1_time', '{selected_time.time}'); formData.append('1_appointmentLabel', '{selected_label}'); formData.append('1_captcha_token', '{g_token}'); formData.append('0', '[{{"status":"IDLE"}},"$K1"]'); const headers = {{ 'Next-Action': '{ACTION_ID}', 'Next-Router-State-Tree': decodeURIComponent('{router_state}'), 'Accept': 'text/x-component' }}; return fetch(url, {{ method: 'POST', headers: headers, body: formData }}).then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}).catch(err => {{ return {{ status: 0, body: err.toString(), headers: {{}}, url: url }}; }}); """ self._log("Submitting booking request via JS Fetch...") res_dict = self.page.run_js(js_script) resp = BrowserResponse(res_dict) # 4. 结果判定 # Next.js Server Action 重定向通常是 303,但 fetch 可能会自动跟随 # 如果 fetch 跟随了,url 会变;如果没跟随(Redirect mode: manual),status 是 303 if resp.status_code == 303 or (resp.status_code == 200 and "appointment-confirmation" in resp.url): self._log(f"Booking Success! URL: {resp.url}") res.success = True res.book_date = selected_date res.book_time = selected_time return res if resp.status_code == 200: if "APPOINTMENT_LIMIT_REACHED" in resp.text: self._log("Failed: Appointment Limit Reached") elif "Invalid captcha" in resp.text: self._log("Failed: Invalid Captcha") else: self._log(f"Booking Failed (Unknown 200): {resp.text[:200]}") else: self._log(f"Booking Failed. Status: {resp.status_code}") return res # --- 辅助方法 --- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0): """ 在浏览器上下文中注入 JS 执行 Fetch """ if not self.page: raise BizLogicError("Browser not initialized") if params: from urllib.parse import urlencode if '?' in url: url += '&' + urlencode(params) else: url += '?' + urlencode(params) fetch_options = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } # Body 处理 if json_data: fetch_options['body'] = json.dumps(json_data) fetch_options['headers']['Content-Type'] = 'application/json' elif data: if isinstance(data, dict): from urllib.parse import urlencode fetch_options['body'] = urlencode(data) fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded' else: fetch_options['body'] = data js_script = f""" const url = "{url}"; const options = {json.dumps(fetch_options)}; return fetch(url, options) .then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}) .catch(error => {{ return {{ status: 0, body: error.toString(), headers: {{}}, url: url }}; }}); """ res_dict = self.page.run_js(js_script, timeout=30) resp = BrowserResponse(res_dict) if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: # [关键修改] 遇到 403 Forbidden,尝试绕盾并重试 # 最多重试 2 次 if retry_count < 2: self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...") # 尝试刷新盾 if self._refresh_firewall_session(): self._log("Firewall session refreshed. Retrying request...") # 递归重试 return self._perform_request(method, url, headers, data, json_data, params, retry_count+1) else: self._log("Failed to refresh firewall session.") # 如果重试失败,抛出异常 raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}") elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: # 如果是 0,可能是 fetch 报错 if resp.status_code == 0: raise BizLogicError(f"Network Error: {resp.text}") # TLS 业务错误 raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _refresh_firewall_session(self) -> bool: """ 主动刷新页面以触发 Cloudflare 挑战并尝试通过 """ try: # 1. 刷新当前页面 (通常 Dashboard 页) # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页 self._log("Refreshing page to trigger Cloudflare...") self.page.refresh() # 2. 调用 CloudflareBypasser cf = CloudflareBypasser(self.page, log=self.config.debug) # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定) success = cf.bypass(max_retry=10) if success: # 再次确认页面是否正常加载 (非 403 页面) title = self.page.title.lower() if "access denied" in title: return False # 等待 DOM 稍微稳定 time.sleep(2) return True return False except Exception as e: self._log(f"Error during firewall refresh: {e}") return False def _get_realnetwork_ip(self): """新标签页获取 IP,规避 CORS""" try: tab = self.page.new_tab("https://api.ipify.org/?format=json") if tab.ele('tag:pre'): json_text = tab.ele('tag:pre').text else: json_text = tab.ele('tag:body').text ip = json.loads(json_text)['ip'] tab.close() return ip except Exception: # 尝试清理 try: if self.page.tabs_count > 1: self.page.close_tabs(self.page.tabs[-1]) except: pass return "0.0.0.0" def _solve_recaptcha(self, params) -> str: """调用 VSCloudApi 解决 ReCaptcha""" key = params.get("apiToken") if not key: raise NotFoundError("Api-token required") submit_url = "https://api.capsolver.com/createTask" task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey"), } if params.get("action"): task["pageAction"] = params.get("action") # 注意:使用 DrissionPage 后,通常是 ProxyLess 模式 # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严) payload = {"clientKey": key, "task": task} import requests as req # 局部引用,避免混淆 r = req.post(submit_url, json=payload, timeout=20) if r.status_code != 200: raise BizLogicError(message="Failed to submit capsolver task") task_id = r.json().get("taskId") for _ in range(20): r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20) if r.status_code == 200: d = r.json() if d.get("status") == "ready": return d["solution"]["gRecaptchaResponse"] time.sleep(3) raise BizLogicError(message="Capsolver task timeout") def _parse_travel_groups(self, html: str) -> List[Dict]: groups = [] js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html, re.DOTALL) if js_match: json_str = js_match.group(1).replace(r'\"', '"') data = json.loads(json_str) for g in data: groups.append({ 'group_name': g.get('groupName'), 'group_number': g.get('formGroupId'), 'location': g.get('vacName') }) else: self._log('Parsed travel group page, but not found travelGroups') return groups def _parse_appointment_slots(self, html: str) -> List[Dict]: slots = [] pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment' match = re.search(pattern, html, re.DOTALL) if match: json_str = match.group(1).replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) lbl = "" # 简化逻辑:TLS label 列表 if 'pta' in labels: lbl = 'pta' elif 'ptaw' in labels: lbl = 'ptaw' elif '' in labels or not labels: lbl = '' slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl }) return slots def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool: if not html: self.is_healthy = False raise SessionExpiredOrInvalidError() # 将 html 转小写检查 html_lower = html.lower() if keyword.lower() not in html_lower: if 'redirected automatically' in html_lower: self.is_healthy = False raise SessionExpiredOrInvalidError("Redirected automatically") if 'login' in html_lower and 'password' in html_lower: self.is_healthy = False raise SessionExpiredOrInvalidError("Redirected to login") if 'session expired' in html_lower: self.is_healthy = False raise SessionExpiredOrInvalidError("Session expired") def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: if not start_str or not end_str: return dates valid_dates = [] s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates