| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- import random
- import time
- from typing import Any
- class CloudflareBypasser:
- def __init__(self, driver: Any, log=True):
- self.driver = driver
- self.log = log
-
- def log_message(self, message):
- if self.log:
- print(message)
-
- def _normalize_page(self):
- # 兼容 TlsPlugin 中的 CamoufoxPageAdapter
- return getattr(self.driver, "_page", self.driver)
- def _is_challenge_frame(self, frame) -> bool:
- frame_name = (frame.name or "").lower()
- frame_url = (frame.url or "").lower()
- markers = (
- "turnstile",
- "challenges.cloudflare.com",
- "challenge",
- "cf-chl",
- )
- return any(m in frame_name or m in frame_url for m in markers)
- def _determine_challenge_type(self) -> str:
- try:
- page = self._normalize_page()
- title = (page.title() or "").lower()
- html = (page.content() or "").lower()
- if "please complete the captcha" in html or "turnstile" in html:
- return "turnstile"
- if "just a moment" in title or "checking your browser" in html:
- return "interstitial"
- return "none"
- except Exception as e:
- self.log_message(f"Error determining challenge type: {e}")
- return "unknown"
- def _click_checkbox_in_frame(self, frame) -> bool:
- selectors = [
- "input[type='checkbox']",
- "[role='checkbox']",
- "label.ctp-checkbox-label",
- "div.ctp-checkbox-label",
- "label[for*='cf']",
- ]
- for selector in selectors:
- try:
- loc = frame.locator(selector)
- if loc.count() <= 0:
- continue
- target = loc.first
- target.click(timeout=2000)
- return True
- except Exception:
- continue
- # 在 frame 内做 open-shadow 递归查找(closed shadow 无法直接访问)
- try:
- clicked = frame.evaluate(
- """
- () => {
- const selectors = [
- "input[type='checkbox']",
- "[role='checkbox']",
- "label.ctp-checkbox-label",
- "div.ctp-checkbox-label",
- "label[for*='cf']"
- ];
- const seen = new WeakSet();
- const stack = [document];
- while (stack.length) {
- const root = stack.pop();
- if (!root || seen.has(root)) continue;
- seen.add(root);
- for (const sel of selectors) {
- const hit = root.querySelector(sel);
- if (hit) {
- hit.click();
- return true;
- }
- }
- const nodes = root.querySelectorAll ? root.querySelectorAll("*") : [];
- for (const node of nodes) {
- if (node.shadowRoot) stack.push(node.shadowRoot);
- }
- }
- return false;
- }
- """
- )
- return bool(clicked)
- except Exception:
- return False
- def _click_challenge_iframe_center(self) -> bool:
- page = self._normalize_page()
- for frame in page.frames:
- if not self._is_challenge_frame(frame):
- continue
- try:
- frame_el = frame.frame_element()
- box = frame_el.bounding_box()
- if not box:
- continue
- # 人类化一点:点击中心附近随机偏移,避免固定坐标
- cx = box["x"] + box["width"] * (0.5 + random.uniform(-0.08, 0.08))
- cy = box["y"] + box["height"] * (0.5 + random.uniform(-0.08, 0.08))
- page.mouse.move(cx, cy, steps=10)
- time.sleep(random.uniform(0.15, 0.45))
- page.mouse.click(cx, cy, delay=random.randint(50, 180))
- return True
- except Exception:
- continue
- return False
-
- def click_verification_button(self, _is_dfs=False):
- try:
- page = self._normalize_page()
- for frame in page.frames:
- if not self._is_challenge_frame(frame):
- continue
- if self._click_checkbox_in_frame(frame):
- self.log_message("Challenge interaction succeeded by frame selector/evaluate.")
- time.sleep(1)
- return
- if self._click_challenge_iframe_center():
- self.log_message("Challenge interaction succeeded by iframe center click.")
- time.sleep(1)
- return
- self.log_message("Challenge click strategies exhausted.")
- except Exception as e:
- self.log_message(f"Error clicking verification button: {e}")
- def is_bypassed(self):
- try:
- page = self._normalize_page()
- title = (page.title() or "").lower()
- html = (page.content() or "").lower()
- blocked_markers = (
- "just a moment",
- "请稍候",
- "checking your browser",
- "cf-challenge",
- "please complete the captcha",
- )
- return not any(m in title or m in html for m in blocked_markers)
- except Exception as e:
- self.log_message(f"Error checking page title: {e}")
- return False
- def _collect_page_state(self) -> str:
- """
- 采样当前页面状态,帮助定位卡在哪一轮挑战。
- """
- try:
- page = self._normalize_page()
- title = page.title()
- url = page.url
- challenge_type = self._determine_challenge_type()
- challenge_frames = 0
- for frame in page.frames:
- if self._is_challenge_frame(frame):
- challenge_frames += 1
- return (
- f"title={title!r}, url={url!r}, challenge_type={challenge_type}, "
- f"challenge_frames={challenge_frames}"
- )
- except Exception as e:
- return f"state_collect_error={e}"
- def _collect_state_signature(self):
- try:
- page = self._normalize_page()
- title = (page.title() or "").lower()
- url = (page.url or "").lower()
- challenge_type = self._determine_challenge_type()
- challenge_frames = 0
- for frame in page.frames:
- if self._is_challenge_frame(frame):
- challenge_frames += 1
- return (challenge_type, challenge_frames, title[:80], url[:120])
- except Exception:
- return ("unknown", -1, "", "")
- def bypass(self, max_retry=5):
- for i in range(max_retry):
- if self.is_bypassed():
- return True
- sig_before = self._collect_state_signature()
- state_before = self._collect_page_state()
- self.log_message(
- f"Verification page detected. try={i + 1}/{max_retry}, before_click: {state_before}"
- )
- self.click_verification_button(False)
- # 点击后短暂等待,再次检查是否通过
- time.sleep(1.2)
- if self.is_bypassed():
- self.log_message("Bypass success after click.")
- return True
- sig_after = self._collect_state_signature()
- no_transition = sig_before == sig_after
- if no_transition:
- self.log_message("No challenge state transition detected after click.")
- # 当页面状态完全不变时,做一次轻量刷新,触发 challenge 重新渲染
- try:
- page = self._normalize_page()
- page.reload(wait_until="domcontentloaded")
- self.log_message("Page reloaded to retrigger challenge rendering.")
- time.sleep(1.5)
- if self.is_bypassed():
- self.log_message("Bypass success after reload.")
- return True
- except Exception as reload_err:
- self.log_message(f"Reload failed: {reload_err}")
- # 递增退避,降低持续高频点击导致的风控风险
- wait_seconds = min(2 + i, 6)
- state_after = self._collect_page_state()
- self.log_message(
- f"Bypass not yet complete, sleeping {wait_seconds}s, after_click: {state_after}"
- )
- time.sleep(wait_seconds)
- final_ok = self.is_bypassed()
- if not final_ok:
- self.log_message(f"Bypass failed after retries. final_state: {self._collect_page_state()}")
- return final_ok
-
|