import random import time from typing import Any class CloudflareBypasser: def __init__(self, driver: Any, log=True): self.driver = driver self.log = log def log_message(self, message): if self.log: print(message) def _normalize_page(self): # 兼容 TlsPlugin 中的 CamoufoxPageAdapter return getattr(self.driver, "_page", self.driver) def _is_challenge_frame(self, frame) -> bool: frame_name = (frame.name or "").lower() frame_url = (frame.url or "").lower() markers = ( "turnstile", "challenges.cloudflare.com", "challenge", "cf-chl", ) return any(m in frame_name or m in frame_url for m in markers) def _determine_challenge_type(self) -> str: try: page = self._normalize_page() title = (page.title() or "").lower() html = (page.content() or "").lower() if "please complete the captcha" in html or "turnstile" in html: return "turnstile" if "just a moment" in title or "checking your browser" in html: return "interstitial" return "none" except Exception as e: self.log_message(f"Error determining challenge type: {e}") return "unknown" def _click_checkbox_in_frame(self, frame) -> bool: selectors = [ "input[type='checkbox']", "[role='checkbox']", "label.ctp-checkbox-label", "div.ctp-checkbox-label", "label[for*='cf']", ] for selector in selectors: try: loc = frame.locator(selector) if loc.count() <= 0: continue target = loc.first target.click(timeout=2000) return True except Exception: continue # 在 frame 内做 open-shadow 递归查找(closed shadow 无法直接访问) try: clicked = frame.evaluate( """ () => { const selectors = [ "input[type='checkbox']", "[role='checkbox']", "label.ctp-checkbox-label", "div.ctp-checkbox-label", "label[for*='cf']" ]; const seen = new WeakSet(); const stack = [document]; while (stack.length) { const root = stack.pop(); if (!root || seen.has(root)) continue; seen.add(root); for (const sel of selectors) { const hit = root.querySelector(sel); if (hit) { hit.click(); return true; } } const nodes = root.querySelectorAll ? root.querySelectorAll("*") : []; for (const node of nodes) { if (node.shadowRoot) stack.push(node.shadowRoot); } } return false; } """ ) return bool(clicked) except Exception: return False def _click_challenge_iframe_center(self) -> bool: page = self._normalize_page() for frame in page.frames: if not self._is_challenge_frame(frame): continue try: frame_el = frame.frame_element() box = frame_el.bounding_box() if not box: continue # 人类化一点:点击中心附近随机偏移,避免固定坐标 cx = box["x"] + box["width"] * (0.5 + random.uniform(-0.08, 0.08)) cy = box["y"] + box["height"] * (0.5 + random.uniform(-0.08, 0.08)) page.mouse.move(cx, cy, steps=10) time.sleep(random.uniform(0.15, 0.45)) page.mouse.click(cx, cy, delay=random.randint(50, 180)) return True except Exception: continue return False def click_verification_button(self, _is_dfs=False): try: page = self._normalize_page() for frame in page.frames: if not self._is_challenge_frame(frame): continue if self._click_checkbox_in_frame(frame): self.log_message("Challenge interaction succeeded by frame selector/evaluate.") time.sleep(1) return if self._click_challenge_iframe_center(): self.log_message("Challenge interaction succeeded by iframe center click.") time.sleep(1) return self.log_message("Challenge click strategies exhausted.") except Exception as e: self.log_message(f"Error clicking verification button: {e}") def is_bypassed(self): try: page = self._normalize_page() title = (page.title() or "").lower() html = (page.content() or "").lower() blocked_markers = ( "just a moment", "请稍候", "checking your browser", "cf-challenge", "please complete the captcha", ) return not any(m in title or m in html for m in blocked_markers) except Exception as e: self.log_message(f"Error checking page title: {e}") return False def _collect_page_state(self) -> str: """ 采样当前页面状态,帮助定位卡在哪一轮挑战。 """ try: page = self._normalize_page() title = page.title() url = page.url challenge_type = self._determine_challenge_type() challenge_frames = 0 for frame in page.frames: if self._is_challenge_frame(frame): challenge_frames += 1 return ( f"title={title!r}, url={url!r}, challenge_type={challenge_type}, " f"challenge_frames={challenge_frames}" ) except Exception as e: return f"state_collect_error={e}" def _collect_state_signature(self): try: page = self._normalize_page() title = (page.title() or "").lower() url = (page.url or "").lower() challenge_type = self._determine_challenge_type() challenge_frames = 0 for frame in page.frames: if self._is_challenge_frame(frame): challenge_frames += 1 return (challenge_type, challenge_frames, title[:80], url[:120]) except Exception: return ("unknown", -1, "", "") def bypass(self, max_retry=5): for i in range(max_retry): if self.is_bypassed(): return True sig_before = self._collect_state_signature() state_before = self._collect_page_state() self.log_message( f"Verification page detected. try={i + 1}/{max_retry}, before_click: {state_before}" ) self.click_verification_button(False) # 点击后短暂等待,再次检查是否通过 time.sleep(1.2) if self.is_bypassed(): self.log_message("Bypass success after click.") return True sig_after = self._collect_state_signature() no_transition = sig_before == sig_after if no_transition: self.log_message("No challenge state transition detected after click.") # 当页面状态完全不变时,做一次轻量刷新,触发 challenge 重新渲染 try: page = self._normalize_page() page.reload(wait_until="domcontentloaded") self.log_message("Page reloaded to retrigger challenge rendering.") time.sleep(1.5) if self.is_bypassed(): self.log_message("Bypass success after reload.") return True except Exception as reload_err: self.log_message(f"Reload failed: {reload_err}") # 递增退避,降低持续高频点击导致的风控风险 wait_seconds = min(2 + i, 6) state_after = self._collect_page_state() self.log_message( f"Bypass not yet complete, sleeping {wait_seconds}s, after_click: {state_after}" ) time.sleep(wait_seconds) final_ok = self.is_bypassed() if not final_ok: self.log_message(f"Bypass failed after retries. final_state: {self._collect_page_state()}") return final_ok