| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import shutil
- import queue
- import threading
- from datetime import datetime
- from typing import List, Dict, Optional, Any, Callable
- from urllib.parse import urljoin, urlparse, urlencode, parse_qs
- from camoufox import NewBrowser
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError, Page, BrowserContext
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from utils.cloudflare_bypass_for_scraping2 import CloudflareBypasser
- def _camoufox_headless_from_env():
- """
- Ubuntu/无显示器 下通过环境变量选择 Camoufox 模式(与 NewBrowser 一致):
- - 未设置 / 0 / false:有头(需真实 DISPLAY 或自行开 Xvfb 并 export DISPLAY=:99)
- - 1 / true / yes / headless:Playwright 真无头(无需 X)
- - virtual / xvfb:由 Camoufox 起 Xvfb 虚拟屏(需安装 Xvfb,适合要「有界面栈」又无可接显示器的 Linux)
- """
- v = (os.environ.get("CAMOUFOX_HEADLESS") or "").strip().lower()
- if v in ("1", "true", "yes", "headless"):
- return True
- if v in ("virtual", "xvfb", "vdisplay"):
- return "virtual"
- return False
- class BrowserResponse:
- """模拟 requests.Response"""
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text:
- return {}
- try:
- self._json = json.loads(self.text)
- except:
- self._json = {}
- return self._json
- class TlsPlugin(IVSPlg):
- """
- TLSContact 签证预约插件 (Camoufox 版)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- self.logger = None
-
- self.page: Optional[Page] = None
- self.browser_ctx: Optional[BrowserContext] = None
- self.playwright = None
- self.travel_group: Optional[Dict] = None
-
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}"))
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- self.session_create_time: float = 0
- # Playwright/Camoufox 的 Page 只能在创建它的线程使用;Sentinel 在线程池里建会话、在监控线程里 query。
- # 用单条工作线程串行所有浏览器操作,避免跨线程卡死或 silent health_check 失败。
- self._pw_cmd_queue: "queue.Queue[Optional[Callable[[], None]]]" = queue.Queue()
- self._pw_thread: Optional[threading.Thread] = None
- self._pw_worker: Optional[threading.Thread] = None
- self._pw_thread_ready = threading.Event()
- self._pw_thread_lock = threading.Lock()
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]):
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[TlsPlugin] [{self.group_id}] {message}')
- else:
- print(f'[TlsPlugin] [{self.group_id}] {message}')
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- def _ensure_pw_thread(self):
- with self._pw_thread_lock:
- if self._pw_thread and self._pw_thread.is_alive():
- return
- self._pw_thread_ready.clear()
- t = threading.Thread(target=self._pw_loop, name=f"camoufox-tls-{self.instance_id}", daemon=True)
- self._pw_thread = t
- t.start()
- if not self._pw_thread_ready.wait(timeout=60):
- raise BizLogicError("Camoufox worker thread failed to start")
- def _pw_loop(self):
- self._pw_worker = threading.current_thread()
- self._pw_thread_ready.set()
- while True:
- work = self._pw_cmd_queue.get()
- if work is None:
- break
- work()
- def _run_on_pw_thread(self, fn, *args, **kwargs):
- if self._pw_worker is not None and threading.current_thread() is self._pw_worker:
- return fn(*args, **kwargs)
- if self._pw_worker is None or not self._pw_thread or not self._pw_thread.is_alive():
- self._ensure_pw_thread()
- out: List[Any] = [None, None]
- done = threading.Event()
- def work():
- try:
- out[1] = fn(*args, **kwargs)
- except BaseException as e:
- out[0] = e
- finally:
- done.set()
- self._pw_cmd_queue.put(work)
- if not done.wait(timeout=600):
- self._log("Browser thread operation timed out (600s).")
- raise BizLogicError("Browser thread operation timeout")
- if out[0] is not None:
- raise out[0]
- return out[1]
- def _stop_pw_thread(self):
- with self._pw_thread_lock:
- t = self._pw_thread
- if not t or not t.is_alive():
- self._pw_thread = None
- self._pw_worker = None
- return
- self._pw_cmd_queue.put(None)
- t.join(timeout=20)
- with self._pw_thread_lock:
- self._pw_thread = None
- self._pw_worker = None
-
- def keep_alive(self):
- if self.page is None:
- return
- def _work():
- try:
- resp = self._perform_request("GET", self.page.url, retry_count=1)
- self._check_page_is_session_expired_or_invalid('Book your appointment', html = resp.text)
- except SessionExpiredOrInvalidError as e:
- self.is_healthy = False
- except Exception as e:
- pass
- try:
- self._run_on_pw_thread(_work)
- except Exception:
- pass
- def _health_check_impl(self) -> bool:
- if not self.is_healthy:
- return False
- if self.page is None:
- return False
- try:
- v = self.page.evaluate("1")
- if v != 1:
- return False
- except:
- return False
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life:
- self._log(f"Session expired.")
- return False
- return True
- def health_check(self) -> bool:
- if not self.is_healthy or self.page is None:
- return False
- try:
- if self._pw_worker is not None and threading.current_thread() is self._pw_worker:
- return self._health_check_impl()
- return self._run_on_pw_thread(self._health_check_impl)
- except Exception:
- return False
-
- def _save_screenshot(self, name_prefix):
- try:
- timestamp = int(time.time())
- filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg"
- save_path = os.path.join("data", filename)
- os.makedirs("data", exist_ok=True)
- self.page.screenshot(path=save_path, full_page=True)
- self._log(f"Screenshot saved to {save_path}")
- except Exception as e:
- self._log(f"Failed to save screenshot: {e}")
- def create_session(self):
- self._ensure_pw_thread()
- try:
- self._run_on_pw_thread(self._create_session_inner)
- except Exception:
- self._stop_pw_thread()
- raise
- def _create_session_inner(self):
- """
- 全浏览器会话创建:过盾 -> JS注入登录 -> 状态机自动路由导航 -> 到达目标页
- 必须在同一条 Camoufox/Playwright 工作线程中执行(Playwright 非线程安全)。
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- proxy_cfg = None
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
- if p.username and p.password:
- proxy_cfg = {
- "server": f"{p.proto}://{p.ip}:{p.port}",
- "username": p.username,
- "password": p.password,
- }
- else:
- proxy_cfg = {"server": f"{p.proto}://{p.ip}:{p.port}"}
- else:
- self._log("[WARN] No proxy configured!")
- try:
- self.playwright = sync_playwright().start()
- headless_opt = _camoufox_headless_from_env()
- self._log(f"Camoufox headless={headless_opt!r} (env CAMOUFOX_HEADLESS)")
- self.browser_ctx = NewBrowser(
- self.playwright,
- persistent_context=True,
- headless=headless_opt,
- user_data_dir=self.user_data_path,
- proxy=proxy_cfg,
- window=(1920, 1080),
- )
- self.page = self.browser_ctx.pages[0] if self.browser_ctx.pages else self.browser_ctx.new_page()
-
- # --- 初始化访问与过盾 ---
- tls_url = self.free_config.get('tls_url', '')
- self._log(f"Navigating: {tls_url}")
- self.page.goto(tls_url, wait_until="domcontentloaded")
- time.sleep(5)
-
- cf_bypasser = CloudflareBypasser(self.page, log=True)
- if not cf_bypasser.bypass(max_retry=15):
- raise BizLogicError("Cloudflare bypass timeout")
- time.sleep(3)
- cf_bypasser.handle_waiting_room()
-
- # --- 状态机导航循环 ---
- max_steps = 20
- session_created = False
- has_submitted_login = False
-
- for step in range(max_steps):
- current_url = self.page.url
- self._log(f"--- [Router Step {step+1}] Current URL: {current_url} ---")
-
- # 状态 1:到达终极目标页面 (成功退出条件)
- if "appointment-booking" in current_url or self.page.locator("button:has-text('Book your appointment')").first.count():
- btn_selector = "button:has-text('Book your appointment')"
- if self._is_selector_visible(btn_selector, timeout=10000):
- self.session_create_time = time.time()
- self._log("✅ Login & Navigation Success! Reached appointment-booking.")
- session_created = True
- break
-
- # 状态 2:遇到没有申请人的拦截页 (致命错误退出条件)
- page_content = self.page.content()
- no_applicant_indicators = [
- "Add a new applicant" in page_content,
- "You have not yet added an applicant" in page_content,
- "applicants-information" in current_url
- ]
- if any(no_applicant_indicators):
- raise BizLogicError(message="No applicant added. Cannot proceed to booking.")
-
- # 状态 3:首页/登录入口页 -> 需要点击进入登录
- if self.page.locator("a[href*='login']").first.count() and not self.page.locator("label:has-text('Email')").first.count():
- self._log("State: Login Portal. Clicking login link...")
- try:
- self.page.locator("a[href*='login']").first.click(timeout=5000)
- time.sleep(3)
- continue
- except Exception:
- pass
-
- # 状态 4:真正的登录表单页
- if self.page.locator("label:has-text('Email')").first.count() and not has_submitted_login:
- self._log("State: Login Form. Processing credentials and Captcha...")
-
- recaptchav2_token = ""
- if self.page.locator(".g-recaptcha").first.count() or self.page.locator("//iframe[contains(@src, 'recaptcha')]").first.count():
- try:
- rec_iframe = self.page.locator("//iframe[contains(@src, 'recaptcha')]").first
- rec_iframe_src = rec_iframe.get_attribute('src') or ""
- rec_parsed = urlparse(rec_iframe_src)
- rec_params = parse_qs(rec_parsed.query)
- rec_sitekey = rec_params.get("k", [None])[0]
- rec_size = rec_params.get("size", [None])[0]
-
- if 'normal' == rec_size and rec_sitekey:
- self._log(f"Solving ReCaptcha sitekey={rec_sitekey}...")
- rc_params = {
- "type": "ReCaptchaV2TaskProxyLess",
- "page": current_url,
- "siteKey": rec_sitekey,
- "apiToken": self.free_config.get("capsolver_key", "")
- }
- recaptchav2_token = self._solve_recaptcha(rc_params)
- except Exception as e:
- self._log(f"ReCaptcha extraction failed: {e}")
-
- username = self.config.account.username
- password = self.config.account.password
-
- self._type_into_first_visible(
- selectors=[
- "input[name='email']",
- "input[type='email']",
- "input#email",
- "input[autocomplete='username']",
- "label:has-text('Email') + input",
- ],
- text=username,
- field_name="Email",
- )
- time.sleep(random.uniform(0.5, 1.2))
-
- self._type_into_first_visible(
- selectors=[
- "input[name='password']",
- "input[type='password']",
- "input#password",
- "input[autocomplete='current-password']",
- "label:has-text('Password') + input",
- ],
- text=password,
- field_name="Password",
- )
-
- # 注入 Token
- if recaptchav2_token:
- inject_js = f"var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = '{recaptchav2_token}'; }}"
- try:
- self.page.evaluate(f"() => {{ {inject_js} }}")
- self._log("ReCaptcha token injected")
- except Exception:
- pass
- time.sleep(random.uniform(0.5, 1.0))
-
- self._log("Submitting Login...")
- time.sleep(random.uniform(0.3, 0.8))
- self.page.locator("button:has-text('Login')").first.click(timeout=10000)
- has_submitted_login = True
- time.sleep(3)
- continue
-
- # 状态 5:Travel Groups 页面
- if "travel-groups" in current_url:
- self._log("State: Travel Groups. Selecting targeted group...")
- groups = self._parse_travel_groups(self.page.content())
- location = self.free_config.get('location')
- self.travel_group = next((g for g in groups if location in g['location']), None)
-
- if not self.travel_group:
- self._save_screenshot("group_not_found")
- raise NotFoundError(f"Group not found for {location}")
-
- formgroup_id = self.travel_group.get('group_number')
- btn_selector = f'button[name="formGroupId"][value="{formgroup_id}"]'
-
- select_buttons = self.page.locator(btn_selector)
- if select_buttons.count():
- # 取最后一个可见的按钮
- select_btn = None
- for i in range(select_buttons.count() - 1, -1, -1):
- btn = select_buttons.nth(i)
- try:
- if btn.is_visible(timeout=1000):
- select_btn = btn
- break
- except Exception:
- continue
-
- if select_btn:
- time.sleep(random.uniform(0.5, 1.2))
- select_btn.click(timeout=10000)
- self._log(f"Clicked select button for group {formgroup_id}")
- time.sleep(3)
- continue
- else:
- self._log("[WARN] Select button found but not visible.")
- else:
- self._log(f"[WARN] Wait timeout for group button {formgroup_id}")
-
- # 状态 6:中间过渡页,需点击 "Book Appointment" 继续往下走
- if self.page.locator('#book-appointment-btn').first.count():
- self._log("State: Intermediate Dashboard. Clicking Book Appointment button...")
- try:
- self.page.locator('#book-appointment-btn').first.click(timeout=10000)
- time.sleep(3)
- continue
- except Exception:
- pass
-
- # 状态 7:登录失败校验 或 未知加载状态
- if "login-actions" in current_url and has_submitted_login:
- self._log("Waiting on login-actions... (Might be authenticating or invalid credentials)")
- time.sleep(2)
- try:
- if self.page.locator("text='Invalid username or password'").first.count():
- raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.")
- except Exception:
- pass
- continue
-
- # 兜底:未匹配到明确状态,等待页面渲染或重定向
- self._log("State: Transitioning or Unknown. Waiting 2 seconds...")
- time.sleep(2)
-
- # 如果循环耗尽还没到达目标
- if not session_created:
- raise BizLogicError(f"Failed to reach appointment-booking after {max_steps} navigation steps. Stuck at: {self.page.url}")
- except Exception as e:
- self._log(f"Session Create Error: {e}")
- if self.config.debug:
- self._save_screenshot("create_session_except")
- self._cleanup_failed_session()
- raise e
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- return self._run_on_pw_thread(self._query_impl, apt_type)
- def _day_block_locator_candidates(self):
- # 与 Drission 版 `//div[p and div//button[contains(@data-testid, "slot")]]` 对齐(子 div 下含 slot 按钮)
- yield self.page.locator(
- "xpath=//div[./p and ./div//button[contains(@data-testid, 'slot')]]"
- )
- # 结构略变:任意后代 button 带 slot
- yield self.page.locator(
- "xpath=//div[./p and .//button[contains(@data-testid, 'slot ')]]"
- )
- # 仅要求有 p 与 slot 类按钮
- yield self.page.locator(
- "xpath=//div[.//p and .//button[contains(@data-testid, 'slot')]]"
- )
- # Playwright 原生 :has
- yield self.page.locator("div").filter(
- has=self.page.locator("p")
- ).filter(
- has=self.page.locator("button[data-testid*='slot']")
- )
- def _extract_slots_from_calendar_dom(
- self, target_year: int, target_month_num: int
- ) -> List[Dict[str, Any]]:
- """多策略定位「日期块 + 可点时段按钮」,与页面结构差异/Camoufox 兼容。"""
- all_slots: List[Dict[str, Any]] = []
- day_blocks = None
- for loc in self._day_block_locator_candidates():
- try:
- n = loc.count()
- except Exception:
- continue
- if n > 0:
- day_blocks = loc
- self._log(f"使用日历块选择器,匹配到 {n} 个 day_blocks")
- break
- if day_blocks is None:
- # 不依赖 day_block 外壳:直接扫可用按钮,再向祖先找日期
- return self._extract_slots_from_available_buttons_only(
- target_year, target_month_num
- )
- for i in range(day_blocks.count()):
- block = day_blocks.nth(i)
- p_ele = block.locator("p").first
- if not p_ele.count():
- continue
- day_match = re.search(r"\d+", p_ele.inner_text())
- if not day_match:
- continue
- day_str = day_match.group()
- try:
- full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}"
- except ValueError:
- continue
- available_btns = block.locator("button[data-testid^='btn-available-slot']")
- for j in range(available_btns.count()):
- btn = available_btns.nth(j)
- btn_html = btn.inner_html()
- time_match = re.search(r"\d{2}:\d{2}", btn_html)
- if not time_match:
- continue
- time_str = time_match.group()
- test_id = btn.get_attribute("data-testid") or ""
- if "prime" in test_id and "weekend" in test_id:
- lbl = "ptaw"
- elif "prime" in test_id:
- lbl = "pta"
- else:
- lbl = ""
- all_slots.append(
- {"date": full_date, "time": time_str, "label": lbl}
- )
- if all_slots:
- return all_slots
- return self._extract_slots_from_available_buttons_only(
- target_year, target_month_num
- )
- def _extract_slots_from_available_buttons_only(
- self, target_year: int, target_month_num: int
- ) -> List[Dict[str, Any]]:
- """当整块 DOM 选不中时,用可用按钮反查日期行。"""
- all_slots: List[Dict[str, Any]] = []
- btns = self.page.locator("button[data-testid^='btn-available-slot']")
- n = btns.count()
- if n == 0:
- return []
- self._log(f"按可用按钮回查日期,共 {n} 个 btn-available-slot")
- for j in range(n):
- btn = btns.nth(j)
- row = btn.locator("xpath=./ancestor::div[.//p][1]")
- p_ele = row.locator("p").first
- if not p_ele.count():
- continue
- day_match = re.search(r"\d+", p_ele.inner_text())
- if not day_match:
- continue
- day_str = day_match.group()
- try:
- full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}"
- except ValueError:
- continue
- btn_html = btn.inner_html()
- time_match = re.search(r"\d{2}:\d{2}", btn_html)
- if not time_match:
- continue
- time_str = time_match.group()
- test_id = btn.get_attribute("data-testid") or ""
- if "prime" in test_id and "weekend" in test_id:
- lbl = "ptaw"
- elif "prime" in test_id:
- lbl = "pta"
- else:
- lbl = ""
- all_slots.append({"date": full_date, "time": time_str, "label": lbl})
- return all_slots
- def _query_impl(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-
- target_date_obj = datetime.strptime(interest_month, "%m-%Y")
- target_month_text = target_date_obj.strftime("%B %Y")
- target_year = target_date_obj.year
- target_month_num = target_date_obj.month
-
- slots = []
- current_selected_ele = self.page.locator('[data-testid="btn-current-month-available"]').first
- current_month_text = current_selected_ele.inner_text().strip() if current_selected_ele.count() else ""
- is_on_target_month = (current_month_text.lower() == target_month_text.lower())
- if not is_on_target_month:
- self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...")
- reached_target = False
- for step in range(12):
- current_ele = self.page.locator('[data-testid="btn-current-month-available"]').first
- if current_ele.count() and current_ele.inner_text().strip().lower() == target_month_text.lower():
- self._log(f"✅ Successfully navigated to target month: '{target_month_text}'!")
- reached_target = True
- break
-
- next_btn = self.page.locator('[data-testid="btn-next-month-available"]').first
-
- if next_btn.count():
- next_btn.click(timeout=5000)
- time.sleep(2.5)
- else:
- self._log("⚠️ Reached the end of the calendar or 'Next Month' is disabled.")
- break
- if not reached_target:
- self._log(f"❌ Could not navigate to target month: {target_month_text}. Stop parsing.")
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- self._log("Extracting slots from DOM using robust data-testid features...")
- slots = self._extract_slots_from_calendar_dom(target_year, target_month_num)
- else:
- self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...")
- resp = self._perform_request("GET", self.page.url, retry_count=1)
- self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text)
- slots = self._parse_appointment_slots(resp.text)
-
- if slots:
- res.success = True
- earliest_date = slots[0]["date"]
- earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = earliest_dt
- date_map: dict[datetime, list[TimeSlot]] = {}
- for s in slots:
- date_str = s["date"]
- dt = datetime.strptime(date_str, "%Y-%m-%d")
- date_map.setdefault(dt, []).append(
- TimeSlot(time=s["time"], label=str(s.get("label", "")))
- )
- res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()]
- self._log(f"Slot Found! -> {slots}")
- else:
- self._log("No slots available.")
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- return self._run_on_pw_thread(self._book_impl, slot_info, user_inputs)
- def _book_impl(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- if user_inputs is None:
- user_inputs = {}
- res = VSBookResult()
- res.success = False
-
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
- support_pta = user_inputs.get('support_pta', True)
- target_labels = ['']
- if support_pta:
- target_labels.append('pta')
- available_dates_str =[
- da.date.strftime("%Y-%m-%d")
- for da in slot_info.availability if da.date
- ]
-
- valid_dates_list = self._filter_dates(available_dates_str, exp_start, exp_end)
- if not valid_dates_list:
- raise NotFoundError(message="No dates match user constraints")
-
- all_possible_slots =[]
- for da in slot_info.availability:
- if not da.date:
- continue
-
- date_str = da.date.strftime("%Y-%m-%d")
- if date_str in valid_dates_list:
- for t in da.times:
- if t.label in target_labels:
- all_possible_slots.append({
- "date": date_str,
- "time_obj": t,
- "label": t.label
- })
-
- if not all_possible_slots:
- raise NotFoundError(message="No suitable slot found (after label filtering)")
- selected_slot = random.choice(all_possible_slots)
- selected_date = selected_slot["date"]
- selected_time = selected_slot["time_obj"]
- selected_label = selected_slot["label"]
- self._log(f"Found {len(all_possible_slots)} valid slots. selected slot: {selected_date} {selected_time.time} {selected_label}")
-
- # 随机选择预订模式 - Mode 1 (鼠标移动 + JS更新 + 点击) 或 Mode 2 (直接 JS 更新 + 点击)
- book_mode = random.choice([1, 2])
- self._log(f"Using booking mode: {book_mode}")
-
- if book_mode == 1:
- # Mode 1: 模拟真实用户行为 - 先移动鼠标到随机位置
- rand_x = random.randint(300, 800)
- rand_y = random.randint(400, 700)
- self._log(f"Mode 1: Moving mouse to ({rand_x}, {rand_y}) and clicking")
- # Playwright 中不直接支持 HumanMouse,但可以通过 hover 和 click 来实现
- dummy_locator = self.page.locator(f"xpath=//*[@id='dummy_{random.randint(1000, 9999)}']")
- try:
- # 如果虚拟定位器存在就点击(通常不会存在),否则只是触发 mousemove 事件
- dummy_locator.first.click(timeout=500)
- except Exception:
- pass
-
- js_update_form = f"""
- try {{
- const buttons = Array.from(document.querySelectorAll('button[type="submit"]'));
- const submitBtn = buttons.find(btn => {{
- return btn.textContent.trim().toLowerCase().includes('book your appointment');
- }});
- if (!submitBtn) return 'Submit button not found';
- const form = submitBtn.closest('form');
- if (!form) return 'Correct form not found';
- function setReactValue(input, value) {{
- if (!input) return;
- input.value = value;
- input.dispatchEvent(new Event('input', {{ bubbles: true }}));
- input.dispatchEvent(new Event('change', {{ bubbles: true }}));
- }}
- setReactValue(form.querySelector('input[name="date"]'), '{selected_date}');
- setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}');
- setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}');
- submitBtn.removeAttribute('disabled');
- submitBtn.classList.remove('opacity-50', 'cursor-not-allowed');
- return 'form_updated';
- }} catch (e) {{
- return e.toString();
- }}
- """
- update_res = self.page.evaluate(f"() => {{ {js_update_form} }}")
- self._log(f"Mode 1: Form update triggered: {update_res}")
-
- if update_res != 'form_updated':
- raise BizLogicError(message=f"Failed to update form in Mode 1: {update_res}")
-
- # 通过按钮定位器点击
- submit_btn = self.page.locator("button:has-text('Book your appointment')").first
- if not submit_btn.count():
- raise BizLogicError(message="Submit button not found for Mode 1")
-
- self._log("Mode 1: Moving mouse to submit button and clicking")
- time.sleep(random.uniform(0.2, 0.5))
- submit_btn.click(timeout=10000)
- inject_res = 'clicked'
-
- else:
- # Mode 2: 直接 JS 注入和点击 (更快但可能被检测)
- js_inject_and_click = f"""
- try {{
- const buttons = Array.from(document.querySelectorAll('button[type="submit"]'));
- const submitBtn = buttons.find(btn => {{
- return btn.textContent.trim().toLowerCase().includes('book your appointment');
- }});
- if (!submitBtn) return 'Submit button not found';
- const form = submitBtn.closest('form');
- if (!form) return 'Correct form not found';
- function setReactValue(input, value) {{
- if (!input) return;
- input.value = value;
- input.dispatchEvent(new Event('input', {{ bubbles: true }}));
- input.dispatchEvent(new Event('change', {{ bubbles: true }}));
- }}
- setReactValue(form.querySelector('input[name="date"]'), '{selected_date}');
- setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}');
- setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}');
- submitBtn.removeAttribute('disabled');
- submitBtn.click();
- return 'clicked';
- }} catch (e) {{
- return e.toString();
- }}
- """
- inject_res = self.page.evaluate(f"() => {{ {js_inject_and_click} }}")
- self._log(f"Mode 2: Form submission triggered: {inject_res}")
-
- if inject_res != 'clicked':
- raise BizLogicError(message="Failed to inject form or click the submit button")
- self._log("Waiting for Next.js to process the form submission...")
- for _ in range(10):
- try:
- current_page_url = self.page.url
- current_page_html = self.page.content()
- appointment_confirmation_indicators = [
- "order-summary" in current_page_url,
- "partner-services" in current_page_url,
- "appointment-confirmation" in current_page_url,
- "Change my appointment" in current_page_html,
- "Book a new appointment" in current_page_html,
- ]
-
- if any(appointment_confirmation_indicators):
- self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}")
- res.success = True
- res.label = selected_label
- res.book_date = selected_date
- res.book_time = selected_time.time
- self._save_screenshot("book_slot_success")
- break
-
- toast_selector = '[role=\"alert\"]'
- toast_ele = self.page.locator(toast_selector).first
- if toast_ele.count():
- error_msg = toast_ele.inner_text()
- self._log(f"❌ BOOKING FAILED! Detected popup: {error_msg}")
- break
- time.sleep(0.5)
- except Exception:
- pass
- return res
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.proto}://{s.ip}:{s.port}"
- return proxy_url
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- """
- 在浏览器上下文中注入 JS 执行 Fetch
- """
- if not self.page:
- raise BizLogicError("Browser not initialized")
- if params:
- from urllib.parse import urlencode
- if '?' in url:
- url += '&' + urlencode(params)
- else:
- url += '?' + urlencode(params)
- fetch_options = {
- "method": method.upper(),
- "headers": headers or {},
- "credentials": "include"
- }
- # Body 处理
- if json_data:
- fetch_options['body'] = json.dumps(json_data)
- fetch_options['headers']['Content-Type'] = 'application/json'
- elif data:
- if isinstance(data, dict):
- from urllib.parse import urlencode
- fetch_options['body'] = urlencode(data)
- fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
- else:
- fetch_options['body'] = data
- js_script = f"""
- const url = "{url}";
- const options = {json.dumps(fetch_options)};
-
- return fetch(url, options)
- .then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
-
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url
- }};
- }})
- .catch(error => {{
- return {{
- status: 0,
- body: error.toString(),
- headers: {{}},
- url: url
- }};
- }});
- """
-
- res_dict = self.page.evaluate(f"() => {{ {js_script} }}")
- resp = BrowserResponse(res_dict)
-
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- if retry_count < 2:
- self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...")
- if self._refresh_firewall_session():
- self._log("Firewall session refreshed. Retrying request...")
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
- else:
- self._log("Failed to refresh firewall session.")
- raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- if resp.status_code == 0:
- raise BizLogicError(f"Network Error: {resp.text}")
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-
- def _refresh_firewall_session(self) -> bool:
- """
- 主动刷新页面以触发 Cloudflare 挑战并尝试通过
- """
- try:
- # 1. 刷新当前页面 (通常 Dashboard 页)
- # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页
- self._log("Refreshing page to trigger Cloudflare...")
- self.page.reload(wait_until="domcontentloaded")
-
- # 2. 调用 CloudflareBypasser
- cf = CloudflareBypasser(self.page, log=self.config.debug)
-
- # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定)
- success = cf.bypass(max_retry=10)
-
- if success:
- # 再次确认页面是否正常加载 (非 403 页面)
- title = self.page.title().lower()
- if "access denied" in title:
- return False
-
- # 等待 DOM 稍微稳定
- time.sleep(2)
- return True
-
- return False
- except Exception as e:
- self._log(f"Error during firewall refresh: {e}")
- return False
- def _solve_recaptcha(self, params) -> str:
- """调用 VSCloudApi 解决 ReCaptcha"""
- key = params.get("apiToken")
- if not key: raise NotFoundError("Api-token required")
-
- submit_url = "https://api.capsolver.com/createTask"
- task = {
- "type": params.get("type"),
- "websiteURL": params.get("page"),
- "websiteKey": params.get("siteKey"),
- }
- if params.get("action"):
- task["pageAction"] = params.get("action")
-
- # if params.get("proxy"):
- # p = urlparse(params.get("proxy"))
- # task["proxyType"] = p.proto
- # task["proxyAddress"] = p.hostname
- # task["proxyPort"] = p.port
- # if p.username:
- # task["proxyLogin"] = p.username
- # task["proxyPassword"] = p.password
-
- # 注意:使用 Camoufox 后,通常是 ProxyLess 模式
- # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严)
-
- payload = {"clientKey": key, "task": task}
- import requests as req # 局部引用,避免混淆
- r = req.post(submit_url, json=payload, timeout=20)
- if r.status_code != 200:
- raise BizLogicError(message="Failed to submit capsolver task")
-
- task_id = r.json().get("taskId")
- for _ in range(20):
- r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20)
- if r.status_code == 200:
- d = r.json()
- if d.get("status") == "ready":
- return d["solution"]["gRecaptchaResponse"]
- time.sleep(3)
- raise BizLogicError(message="Capsolver task timeout")
- def _parse_travel_groups(self, html_content) -> List[Dict]:
- groups = []
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html_content, re.DOTALL)
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for g in data:
- groups.append({
- 'group_name': g.get('groupName'),
- 'group_number': g.get('formGroupId'),
- 'location': g.get('vacName')
- })
- else:
- self._log('Parsed travel group page, but not found travelGroups')
- return groups
- def _parse_appointment_slots(self, html_content) -> List[Dict]:
- slots = []
- pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment'
- match = re.search(pattern, html_content, re.DOTALL)
-
- if match:
- json_str = match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- lbl = ""
- # 简化逻辑:TLS label 列表
- if 'pta' in labels: lbl = 'pta'
- elif 'ptaw' in labels: lbl = 'ptaw'
- elif '' in labels or not labels: lbl = ''
-
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl
- })
- return slots
-
- def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool:
- if not html:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- html_lower = html.lower()
- if keyword.lower() not in html_lower:
- session_expire_or_invalid_indicators = [
- 'redirected automatically' in html_lower,
- 'login' in html_lower and 'password' in html_lower,
- 'session expired' in html_lower
- ]
- if any(session_expire_or_invalid_indicators):
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- if not start_str or not end_str:
- return dates
- valid_dates = []
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
- def _is_selector_visible(self, selector: str, timeout: int = 10000) -> bool:
- try:
- self.page.wait_for_selector(selector, state="visible", timeout=timeout)
- return True
- except PlaywrightTimeoutError:
- return False
- def _human_type(self, text: str):
- for ch in text:
- self.page.keyboard.type(ch)
- time.sleep(random.uniform(0.03, 0.12))
- def _type_into_first_visible(self, selectors: List[str], text: str, field_name: str):
- last_err = None
- for selector in selectors:
- try:
- locator = self.page.locator(selector).first
- locator.wait_for(state="visible", timeout=3000)
- locator.click(timeout=3000)
- time.sleep(random.uniform(0.2, 0.6))
- locator.fill("")
- self._human_type(text)
- return
- except Exception as e:
- last_err = e
- continue
- raise BizLogicError(message=f"Can't find visible {field_name} input. Last error: {last_err}")
-
- def _close_playwright(self):
- if self.page:
- try:
- self.page.close()
- except Exception:
- pass
- self.page = None
- if self.browser_ctx:
- try:
- self.browser_ctx.close()
- except Exception:
- pass
- self.browser_ctx = None
- if self.playwright:
- try:
- self.playwright.stop()
- except Exception:
- pass
- self.playwright = None
- def _rmtree_workspace(self):
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
- def _cleanup_failed_session(self):
- """create_session 在工作线程内失败时调用;外层会 _stop_pw_thread。"""
- self._close_playwright()
- self._rmtree_workspace()
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- w = getattr(self, "_pw_worker", None)
- on_worker = w is not None and threading.current_thread() is w
- if on_worker:
- self._close_playwright()
- self._rmtree_workspace()
- return
- if w is not None and self._pw_thread and self._pw_thread.is_alive():
- try:
- self._run_on_pw_thread(self._close_playwright)
- except Exception:
- self._close_playwright()
- self._rmtree_workspace()
- self._stop_pw_thread()
- else:
- self._close_playwright()
- self._rmtree_workspace()
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
- class TlsPlugin2(TlsPlugin):
- """兼容工厂按模块名加载 `TlsPlugin2` 的场景。"""
- pass
|