import time import json import random import re import os import uuid import shutil import queue import threading from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin, urlparse, urlencode from camoufox import NewBrowser from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError, Page, BrowserContext from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.cloudflare_bypass_for_scraping2 import CloudflareBypasser def _camoufox_headless_from_env(): """ Ubuntu/无显示器 下通过环境变量选择 Camoufox 模式(与 NewBrowser 一致): - 未设置 / 0 / false:有头(需真实 DISPLAY 或自行开 Xvfb 并 export DISPLAY=:99) - 1 / true / yes / headless:Playwright 真无头(无需 X) - virtual / xvfb:由 Camoufox 起 Xvfb 虚拟屏(需安装 Xvfb,适合要「有界面栈」又无可接显示器的 Linux) """ v = (os.environ.get("CAMOUFOX_HEADLESS") or "").strip().lower() if v in ("1", "true", "yes", "headless"): return True if v in ("virtual", "xvfb", "vdisplay"): return "virtual" return False class BrowserResponse: """模拟 requests.Response""" def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json class TlsPlugin(IVSPlg): """ TLSContact 签证预约插件 (Camoufox 版) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.page: Optional[Page] = None self.browser_ctx: Optional[BrowserContext] = None self.playwright = None self.travel_group: Optional[Dict] = None self.instance_id = uuid.uuid4().hex[:8] self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}")) self.user_data_path = os.path.join(self.root_workspace, "user_data") if not os.path.exists(self.root_workspace): os.makedirs(self.root_workspace) self.session_create_time: float = 0 # Playwright/Camoufox 的 Page 只能在创建它的线程使用;Sentinel 在线程池里建会话、在监控线程里 query。 # 用单条工作线程串行所有浏览器操作,避免跨线程卡死或 silent health_check 失败。 self._pw_cmd_queue: "queue.Queue[Optional[Callable[[], None]]]" = queue.Queue() self._pw_thread: Optional[threading.Thread] = None self._pw_worker: Optional[threading.Thread] = None self._pw_thread_ready = threading.Event() self._pw_thread_lock = threading.Lock() def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]): self.logger = logger def _log(self, message): if self.logger: self.logger(f'[TlsPlugin] [{self.group_id}] {message}') else: print(f'[TlsPlugin] [{self.group_id}] {message}') def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def _ensure_pw_thread(self): with self._pw_thread_lock: if self._pw_thread and self._pw_thread.is_alive(): return self._pw_thread_ready.clear() t = threading.Thread(target=self._pw_loop, name=f"camoufox-tls-{self.instance_id}", daemon=True) self._pw_thread = t t.start() if not self._pw_thread_ready.wait(timeout=60): raise BizLogicError("Camoufox worker thread failed to start") def _pw_loop(self): self._pw_worker = threading.current_thread() self._pw_thread_ready.set() while True: work = self._pw_cmd_queue.get() if work is None: break work() def _run_on_pw_thread(self, fn, *args, **kwargs): if self._pw_worker is not None and threading.current_thread() is self._pw_worker: return fn(*args, **kwargs) if self._pw_worker is None or not self._pw_thread or not self._pw_thread.is_alive(): self._ensure_pw_thread() out: List[Any] = [None, None] done = threading.Event() def work(): try: out[1] = fn(*args, **kwargs) except BaseException as e: out[0] = e finally: done.set() self._pw_cmd_queue.put(work) if not done.wait(timeout=600): self._log("Browser thread operation timed out (600s).") raise BizLogicError("Browser thread operation timeout") if out[0] is not None: raise out[0] return out[1] def _stop_pw_thread(self): with self._pw_thread_lock: t = self._pw_thread if not t or not t.is_alive(): self._pw_thread = None self._pw_worker = None return self._pw_cmd_queue.put(None) t.join(timeout=20) with self._pw_thread_lock: self._pw_thread = None self._pw_worker = None def keep_alive(self): if self.page is None: return def _work(): try: resp = self._perform_request("GET", self.page.url, retry_count=1) self._check_page_is_session_expired_or_invalid('Book your appointment', html = resp.text) except SessionExpiredOrInvalidError as e: self.is_healthy = False except Exception as e: pass try: self._run_on_pw_thread(_work) except Exception: pass def _health_check_impl(self) -> bool: if not self.is_healthy: return False if self.page is None: return False try: v = self.page.evaluate("1") if v != 1: return False except: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life: self._log(f"Session expired.") return False return True def health_check(self) -> bool: if not self.is_healthy or self.page is None: return False try: if self._pw_worker is not None and threading.current_thread() is self._pw_worker: return self._health_check_impl() return self._run_on_pw_thread(self._health_check_impl) except Exception: return False def _save_screenshot(self, name_prefix): try: timestamp = int(time.time()) filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg" save_path = os.path.join("data", filename) os.makedirs("data", exist_ok=True) self.page.screenshot(path=save_path, full_page=False) self._log(f"Screenshot saved to {save_path}") except Exception as e: self._log(f"Failed to save screenshot: {e}") def create_session(self): self._ensure_pw_thread() try: self._run_on_pw_thread(self._create_session_inner) except Exception: self._stop_pw_thread() raise def _create_session_inner(self): """ 全浏览器会话创建:过盾 -> JS注入登录 -> 原生跳转 必须在同一条 Camoufox/Playwright 工作线程中执行(Playwright 非线程安全)。 """ self._log(f"Initializing Session (ID: {self.instance_id})...") proxy_cfg = None if self.config.proxy and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: proxy_cfg = { "server": f"{p.proto}://{p.ip}:{p.port}", "username": p.username, "password": p.password, } else: proxy_cfg = {"server": f"{p.proto}://{p.ip}:{p.port}"} else: self._log("[WARN] No proxy configured!") try: self.playwright = sync_playwright().start() headless_opt = _camoufox_headless_from_env() self._log(f"Camoufox headless={headless_opt!r} (env CAMOUFOX_HEADLESS)") self.browser_ctx = NewBrowser( self.playwright, persistent_context=True, headless=headless_opt, user_data_dir=self.user_data_path, proxy=proxy_cfg, window=(1920, 1080), ) self.page = self.browser_ctx.pages[0] if self.browser_ctx.pages else self.browser_ctx.new_page() tls_url = self.free_config.get('tls_url', '') self._log(f"Navigating: {tls_url}") self.page.goto(tls_url, wait_until="domcontentloaded") time.sleep(5) cf_bypasser = CloudflareBypasser(self.page, log=True) if not cf_bypasser.bypass(max_retry=15): raise BizLogicError("Cloudflare bypass timeout") time.sleep(3) btn_selector = "button:has-text('Login')" if not self._is_selector_visible(btn_selector, timeout=3000): self.page.locator("a[href*='login']").first.click(timeout=5000) time.sleep(3) if not self._is_selector_visible(btn_selector, timeout=10000): raise BizLogicError(message=f"Can't find selector={btn_selector}") time.sleep(random.uniform(0.5, 1)) # recaptchav2_token = "" # if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): # self._log("Solving ReCaptcha...") # rc_params = { # "type": "ReCaptchaV2TaskProxyLess", # "page": self.page.url, # "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0", # "apiToken": self.free_config.get("capsolver_key", "") # } # recaptchav2_token = self._solve_recaptcha(rc_params) username = self.config.account.username password = self.config.account.password self._type_into_first_visible( selectors=[ "input[name='email']", "input[type='email']", "input#email", "input[autocomplete='username']", "label:has-text('Email') + input", ], text=username, field_name="Email", ) time.sleep(random.uniform(0.5, 1.2)) self._type_into_first_visible( selectors=[ "input[name='password']", "input[type='password']", "input#password", "input[autocomplete='current-password']", "label:has-text('Password') + input", ], text=password, field_name="Password", ) # if recaptchav2_token: # inject_recaptchav2_token_js = f""" # var g = document.getElementById('g-recaptcha-response'); # if(g) {{ g.value = "{recaptchav2_token}"; }} # """ # self._log("Inject ReCaptchaV2 Token via JS...") # self.page.run_js(inject_recaptchav2_token_js) # time.sleep(random.uniform(0.5, 1.0)) self._log("Submitting Login...") time.sleep(random.uniform(0.3, 0.8)) self.page.locator("button:has-text('Login')").first.click(timeout=10000) self._log("Waiting for redirect...") self.page.wait_for_function( "() => !window.location.href.includes('login-actions')", timeout=45000, ) time.sleep(3) if "login-actions" in self.page.url or "auth" in self.page.url: raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.") self.page.wait_for_load_state("domcontentloaded", timeout=15000) time.sleep(5) # groups = self._parse_travel_groups(self.page.html) # location = self.free_config.get('location') # for g in groups: # if g['location'] == location: # self.travel_group = g # break # if not self.travel_group: # self._save_screenshot("group_not_found") # raise NotFoundError(f"Group not found for {location}") # formgroup_id = self.travel_group.get('group_number') # btn_selector = f'tag:button@@name=formGroupId@@value={formgroup_id}' # self._log(f"Waiting for visible button to render: {formgroup_id}...") # self.page.wait.eles_loaded(btn_selector, timeout=15) # buttons = self.page.eles(btn_selector) # select_btn = None # for btn in reversed(buttons): # try: # w, h = btn.rect.size # if w > 0 and h > 0: # select_btn = btn # break # except Exception: # continue # if not select_btn: # self._save_screenshot("visible_button_not_found") # raise BizLogicError(f"Can't find any visible Select button for group {formgroup_id}") # time.sleep(random.uniform(0.5, 1.2)) # self.mouse.human_click_ele(select_btn) # self._log("Waiting for url redirect...") # self.page.wait.url_change('travel-groups', exclude=True, timeout=45) # time.sleep(2) # if "travel-groups" in self.page.url or "auth" in self.page.url: # raise BizLogicError(message="Redirect to service-level Failed!") # no_applicant_indicators = [ # "Add a new applicant" in self.page.html, # "You have not yet added an applicant. Please click the button below to add one." in self.page.html, # "applicants-information" in self.page.url # ] # if any(no_applicant_indicators): # raise BizLogicError(message=f"No applicant added") btn_selector = '#book-appointment-btn' self._log(f"Waiting for selector={btn_selector} to render...") if not self._is_selector_visible(btn_selector, timeout=15000): raise BizLogicError(message=f"Waiting for selector={btn_selector} timeout") self.page.locator(btn_selector).first.click(timeout=10000) time.sleep(3) # self._log("Waiting for url redirect...") # self.page.wait.url_change('service-level', exclude=True, timeout=45) # time.sleep(2) # if "service-level" in self.page.url or "auth" in self.page.url: # raise BizLogicError(message="Redirect to appointment-booking Failed!") btn_selector = "button:has-text('Book your appointment')" if not self._is_selector_visible(btn_selector, timeout=10000): raise BizLogicError(message=f"Waiting for selector={btn_selector} timeout") self.session_create_time = time.time() self._log(f"✅ Login & Navigation Success!") except Exception as e: self._log(f"Session Create Error: {e}") if self.config.debug: self._save_screenshot("create_session_except") self._cleanup_failed_session() raise e def query(self, apt_type: AppointmentType) -> VSQueryResult: return self._run_on_pw_thread(self._query_impl, apt_type) def _day_block_locator_candidates(self): # 与 Drission 版 `//div[p and div//button[contains(@data-testid, "slot")]]` 对齐(子 div 下含 slot 按钮) yield self.page.locator( "xpath=//div[./p and ./div//button[contains(@data-testid, 'slot')]]" ) # 结构略变:任意后代 button 带 slot yield self.page.locator( "xpath=//div[./p and .//button[contains(@data-testid, 'slot')]]" ) # 仅要求有 p 与 slot 类按钮 yield self.page.locator( "xpath=//div[.//p and .//button[contains(@data-testid, 'slot')]]" ) # Playwright 原生 :has yield self.page.locator("div").filter( has=self.page.locator("p") ).filter( has=self.page.locator("button[data-testid*='slot']") ) def _extract_slots_from_calendar_dom( self, target_year: int, target_month_num: int ) -> List[Dict[str, Any]]: """多策略定位「日期块 + 可点时段按钮」,与页面结构差异/Camoufox 兼容。""" all_slots: List[Dict[str, Any]] = [] day_blocks = None for loc in self._day_block_locator_candidates(): try: n = loc.count() except Exception: continue if n > 0: day_blocks = loc self._log(f"使用日历块选择器,匹配到 {n} 个 day_blocks") break if day_blocks is None: # 不依赖 day_block 外壳:直接扫可用按钮,再向祖先找日期 return self._extract_slots_from_available_buttons_only( target_year, target_month_num ) for i in range(day_blocks.count()): block = day_blocks.nth(i) p_ele = block.locator("p").first if not p_ele.count(): continue day_match = re.search(r"\d+", p_ele.inner_text()) if not day_match: continue day_str = day_match.group() try: full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}" except ValueError: continue available_btns = block.locator("button[data-testid^='btn-available-slot']") for j in range(available_btns.count()): btn = available_btns.nth(j) btn_html = btn.inner_html() time_match = re.search(r"\d{2}:\d{2}", btn_html) if not time_match: continue time_str = time_match.group() test_id = btn.get_attribute("data-testid") or "" if "prime" in test_id and "weekend" in test_id: lbl = "ptaw" elif "prime" in test_id: lbl = "pta" else: lbl = "" all_slots.append( {"date": full_date, "time": time_str, "label": lbl} ) if all_slots: return all_slots return self._extract_slots_from_available_buttons_only( target_year, target_month_num ) def _extract_slots_from_available_buttons_only( self, target_year: int, target_month_num: int ) -> List[Dict[str, Any]]: """当整块 DOM 选不中时,用可用按钮反查日期行。""" all_slots: List[Dict[str, Any]] = [] btns = self.page.locator("button[data-testid^='btn-available-slot']") n = btns.count() if n == 0: return [] self._log(f"按可用按钮回查日期,共 {n} 个 btn-available-slot") for j in range(n): btn = btns.nth(j) row = btn.locator("xpath=./ancestor::div[.//p][1]") p_ele = row.locator("p").first if not p_ele.count(): continue day_match = re.search(r"\d+", p_ele.inner_text()) if not day_match: continue day_str = day_match.group() try: full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}" except ValueError: continue btn_html = btn.inner_html() time_match = re.search(r"\d{2}:\d{2}", btn_html) if not time_match: continue time_str = time_match.group() test_id = btn.get_attribute("data-testid") or "" if "prime" in test_id and "weekend" in test_id: lbl = "ptaw" elif "prime" in test_id: lbl = "pta" else: lbl = "" all_slots.append({"date": full_date, "time": time_str, "label": lbl}) return all_slots def _query_impl(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) target_date_obj = datetime.strptime(interest_month, "%m-%Y") target_month_text = target_date_obj.strftime("%B %Y") target_year = target_date_obj.year target_month_num = target_date_obj.month slots = [] all_slots = [] current_selected_ele = self.page.locator('[data-testid="btn-current-month-available"]').first current_month_text = current_selected_ele.inner_text().strip() if current_selected_ele.count() else "" is_on_target_month = (current_month_text.lower() == target_month_text.lower()) if not is_on_target_month: self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...") for _ in range(12): target_btn_xpath = f'xpath://a[contains(@href, "month={interest_month}")]' target_btn = self.page.locator(f"a[href*='month={interest_month}']").first if target_btn.count(): target_btn.click(timeout=5000) time.sleep(3) break next_btn = self.page.locator('[data-testid="btn-next-month-available"]').first if next_btn.count(): next_btn.click(timeout=5000) time.sleep(2) else: self._log("Warning: Cannot find target month or 'Next Month' button.") break try: self.page.wait_for_load_state("networkidle", timeout=20000) except PlaywrightTimeoutError: try: self.page.wait_for_load_state("domcontentloaded", timeout=10000) except PlaywrightTimeoutError: pass time.sleep(0.8) self._log("Extracting slots from DOM using robust data-testid features...") all_slots = self._extract_slots_from_calendar_dom( target_year, target_month_num ) if not all_slots: n_slot_btns = self.page.locator("[data-testid*='slot']").count() n_avail = self.page.locator("button[data-testid^='btn-available-slot']").count() self._log( f"DOM 日历未解析到槽位: [data-testid*=\"slot\"]={n_slot_btns}, " f"btn-available-slot={n_avail},回退为页面 HTML 内嵌 JSON 解析" ) try: resp = self._perform_request("GET", self.page.url, retry_count=1) self._check_page_is_session_expired_or_invalid("Book your appointment", resp.text) all_slots = self._parse_appointment_slots(resp.text) except Exception as ex: self._log(f"HTML 回退解析失败: {ex}") else: self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...") resp = self._perform_request("GET", self.page.url, retry_count=1) self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text) all_slots = self._parse_appointment_slots(resp.text) target_labels = self.free_config.get("target_labels", ["", "pta"]) slots = [s for s in all_slots if s.get("label") in target_labels] if slots: res.success = True earliest_date = slots[0]["date"] earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d") res.availability_status = AvailabilityStatus.Available res.earliest_date = earliest_dt date_map: dict[datetime, list[TimeSlot]] = {} for s in slots: date_str = s["date"] dt = datetime.strptime(date_str, "%Y-%m-%d") date_map.setdefault(dt, []).append( TimeSlot(time=s["time"], label=str(s.get("label", ""))) ) res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()] self._log(f"Slot Found! -> {slots}") else: self._log("No slots available.") res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: return self._run_on_pw_thread(self._book_impl, slot_info, user_inputs) def _book_impl(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: if user_inputs is None: user_inputs = {} res = VSBookResult() res.success = False exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') support_pta = user_inputs.get('support_pta', True) target_labels = [''] if support_pta: target_labels.append('pta') available_dates_str =[ da.date.strftime("%Y-%m-%d") for da in slot_info.availability if da.date ] valid_dates_list = self._filter_dates(available_dates_str, exp_start, exp_end) if not valid_dates_list: raise NotFoundError(message="No dates match user constraints") all_possible_slots =[] for da in slot_info.availability: if not da.date: continue date_str = da.date.strftime("%Y-%m-%d") if date_str in valid_dates_list: for t in da.times: if t.label in target_labels: all_possible_slots.append({ "date": date_str, "time_obj": t, "label": t.label }) if not all_possible_slots: raise NotFoundError(message="No suitable slot found (after label filtering)") selected_slot = random.choice(all_possible_slots) selected_date = selected_slot["date"] selected_time = selected_slot["time_obj"] selected_label = selected_slot["label"] self._log(f"Found {len(all_possible_slots)} valid slots. selected slot: {selected_date} {selected_time.time} {selected_label}") js_inject_and_click = f""" try {{ const form = document.querySelector('form'); if (!form) return 'Form not found'; function setReactValue(input, value) {{ if (!input) return; input.value = value; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); }} setReactValue(form.querySelector('input[name="date"]'), '{selected_date}'); setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}'); setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}'); const submitBtn = form.querySelector('button[type="submit"]'); if (submitBtn) {{ submitBtn.removeAttribute('disabled'); submitBtn.classList.remove('opacity-50', 'cursor-not-allowed'); submitBtn.click(); return 'clicked'; }} else {{ return 'Submit button not found'; }} }} catch (e) {{ return e.toString(); }} """ inject_res = self.page.evaluate(f"() => {{ {js_inject_and_click} }}") self._log(f"Form submission triggered: {inject_res}") if inject_res != 'clicked': raise BizLogicError(message="Failed to inject form or click the submit button") self._log("Waiting for Next.js to process the form submission...") for _ in range(10): try: current_page_url = self.page.url current_page_html = self.page.content() appointment_confirmation_indicators = [ "order-summary" in current_page_url, "partner-services" in current_page_url, "appointment-confirmation" in current_page_url, "Change my appointment" in current_page_html, "Book a new appointment" in current_page_html, ] if any(appointment_confirmation_indicators): self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}") res.success = True res.label = selected_label res.book_date = selected_date res.book_time = selected_time.time self._save_screenshot("book_slot_success") break toast_selector = '[role=\"alert\"]' toast_ele = self.page.locator(toast_selector).first if toast_ele.count(): error_msg = toast_ele.inner_text() self._log(f"❌ BOOKING FAILED! Detected popup: {error_msg}") break time.sleep(0.5) except Exception: pass return res def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.proto}://{s.ip}:{s.port}" return proxy_url def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0): """ 在浏览器上下文中注入 JS 执行 Fetch """ if not self.page: raise BizLogicError("Browser not initialized") if params: from urllib.parse import urlencode if '?' in url: url += '&' + urlencode(params) else: url += '?' + urlencode(params) fetch_options = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } # Body 处理 if json_data: fetch_options['body'] = json.dumps(json_data) fetch_options['headers']['Content-Type'] = 'application/json' elif data: if isinstance(data, dict): from urllib.parse import urlencode fetch_options['body'] = urlencode(data) fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded' else: fetch_options['body'] = data js_script = f""" const url = "{url}"; const options = {json.dumps(fetch_options)}; return fetch(url, options) .then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}) .catch(error => {{ return {{ status: 0, body: error.toString(), headers: {{}}, url: url }}; }}); """ res_dict = self.page.evaluate(f"() => {{ {js_script} }}") resp = BrowserResponse(res_dict) if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: if retry_count < 2: self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...") if self._refresh_firewall_session(): self._log("Firewall session refreshed. Retrying request...") return self._perform_request(method, url, headers, data, json_data, params, retry_count+1) else: self._log("Failed to refresh firewall session.") raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}") elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: if resp.status_code == 0: raise BizLogicError(f"Network Error: {resp.text}") raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _refresh_firewall_session(self) -> bool: """ 主动刷新页面以触发 Cloudflare 挑战并尝试通过 """ try: # 1. 刷新当前页面 (通常 Dashboard 页) # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页 self._log("Refreshing page to trigger Cloudflare...") self.page.reload(wait_until="domcontentloaded") # 2. 调用 CloudflareBypasser cf = CloudflareBypasser(self.page, log=self.config.debug) # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定) success = cf.bypass(max_retry=10) if success: # 再次确认页面是否正常加载 (非 403 页面) title = self.page.title().lower() if "access denied" in title: return False # 等待 DOM 稍微稳定 time.sleep(2) return True return False except Exception as e: self._log(f"Error during firewall refresh: {e}") return False def _solve_recaptcha(self, params) -> str: """调用 VSCloudApi 解决 ReCaptcha""" key = params.get("apiToken") if not key: raise NotFoundError("Api-token required") submit_url = "https://api.capsolver.com/createTask" task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey"), } if params.get("action"): task["pageAction"] = params.get("action") # if params.get("proxy"): # p = urlparse(params.get("proxy")) # task["proxyType"] = p.proto # task["proxyAddress"] = p.hostname # task["proxyPort"] = p.port # if p.username: # task["proxyLogin"] = p.username # task["proxyPassword"] = p.password # 注意:使用 Camoufox 后,通常是 ProxyLess 模式 # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严) payload = {"clientKey": key, "task": task} import requests as req # 局部引用,避免混淆 r = req.post(submit_url, json=payload, timeout=20) if r.status_code != 200: raise BizLogicError(message="Failed to submit capsolver task") task_id = r.json().get("taskId") for _ in range(20): r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20) if r.status_code == 200: d = r.json() if d.get("status") == "ready": return d["solution"]["gRecaptchaResponse"] time.sleep(3) raise BizLogicError(message="Capsolver task timeout") def _parse_travel_groups(self, html_content) -> List[Dict]: groups = [] js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html_content, re.DOTALL) if js_match: json_str = js_match.group(1).replace(r'\"', '"') data = json.loads(json_str) for g in data: groups.append({ 'group_name': g.get('groupName'), 'group_number': g.get('formGroupId'), 'location': g.get('vacName') }) else: self._log('Parsed travel group page, but not found travelGroups') return groups def _parse_appointment_slots(self, html_content) -> List[Dict]: slots = [] pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment' match = re.search(pattern, html_content, re.DOTALL) if match: json_str = match.group(1).replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) lbl = "" # 简化逻辑:TLS label 列表 if 'pta' in labels: lbl = 'pta' elif 'ptaw' in labels: lbl = 'ptaw' elif '' in labels or not labels: lbl = '' slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl }) return slots def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool: if not html: self.is_healthy = False raise SessionExpiredOrInvalidError() html_lower = html.lower() if keyword.lower() not in html_lower: session_expire_or_invalid_indicators = [ 'redirected automatically' in html_lower, 'login' in html_lower and 'password' in html_lower, 'session expired' in html_lower ] if any(session_expire_or_invalid_indicators): self.is_healthy = False raise SessionExpiredOrInvalidError() def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: if not start_str or not end_str: return dates valid_dates = [] s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates def _is_selector_visible(self, selector: str, timeout: int = 10000) -> bool: try: self.page.wait_for_selector(selector, state="visible", timeout=timeout) return True except PlaywrightTimeoutError: return False def _human_type(self, text: str): for ch in text: self.page.keyboard.type(ch) time.sleep(random.uniform(0.03, 0.12)) def _type_into_first_visible(self, selectors: List[str], text: str, field_name: str): last_err = None for selector in selectors: try: locator = self.page.locator(selector).first locator.wait_for(state="visible", timeout=3000) locator.click(timeout=3000) time.sleep(random.uniform(0.2, 0.6)) locator.fill("") self._human_type(text) return except Exception as e: last_err = e continue raise BizLogicError(message=f"Can't find visible {field_name} input. Last error: {last_err}") def _close_playwright(self): if self.page: try: self.page.close() except Exception: pass self.page = None if self.browser_ctx: try: self.browser_ctx.close() except Exception: pass self.browser_ctx = None if self.playwright: try: self.playwright.stop() except Exception: pass self.playwright = None def _rmtree_workspace(self): if os.path.exists(self.root_workspace): for _ in range(3): try: time.sleep(0.2) shutil.rmtree(self.root_workspace, ignore_errors=True) break except Exception as e: self._log(f"Cleanup retry: {e}") time.sleep(0.5) if os.path.exists(self.root_workspace): self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}") def _cleanup_failed_session(self): """create_session 在工作线程内失败时调用;外层会 _stop_pw_thread。""" self._close_playwright() self._rmtree_workspace() # --- 资源清理核心方法 --- def cleanup(self): """ 销毁浏览器并彻底删除临时文件 """ w = getattr(self, "_pw_worker", None) on_worker = w is not None and threading.current_thread() is w if on_worker: self._close_playwright() self._rmtree_workspace() return if w is not None and self._pw_thread and self._pw_thread.is_alive(): try: self._run_on_pw_thread(self._close_playwright) except Exception: self._close_playwright() self._rmtree_workspace() self._stop_pw_thread() else: self._close_playwright() self._rmtree_workspace() def __del__(self): """ 析构函数:当对象被垃圾回收时自动调用 """ self.cleanup() class TlsPlugin2(TlsPlugin): """兼容工厂按模块名加载 `TlsPlugin2` 的场景。""" pass