cloudflare_bypass_for_scraping2.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import random
  2. import time
  3. from typing import Any
  4. class CloudflareBypasser:
  5. def __init__(self, driver: Any, log=True):
  6. self.driver = driver
  7. self.log = log
  8. def log_message(self, message):
  9. if self.log:
  10. print(message)
  11. def _normalize_page(self):
  12. # 兼容 TlsPlugin 中的 CamoufoxPageAdapter
  13. return getattr(self.driver, "_page", self.driver)
  14. def _is_challenge_frame(self, frame) -> bool:
  15. frame_name = (frame.name or "").lower()
  16. frame_url = (frame.url or "").lower()
  17. markers = (
  18. "turnstile",
  19. "challenges.cloudflare.com",
  20. "challenge",
  21. "cf-chl",
  22. )
  23. return any(m in frame_name or m in frame_url for m in markers)
  24. def _determine_challenge_type(self) -> str:
  25. try:
  26. page = self._normalize_page()
  27. title = (page.title() or "").lower()
  28. html = (page.content() or "").lower()
  29. if "please complete the captcha" in html or "turnstile" in html:
  30. return "turnstile"
  31. if "just a moment" in title or "checking your browser" in html:
  32. return "interstitial"
  33. return "none"
  34. except Exception as e:
  35. self.log_message(f"Error determining challenge type: {e}")
  36. return "unknown"
  37. def _click_checkbox_in_frame(self, frame) -> bool:
  38. selectors = [
  39. "input[type='checkbox']",
  40. "[role='checkbox']",
  41. "label.ctp-checkbox-label",
  42. "div.ctp-checkbox-label",
  43. "label[for*='cf']",
  44. ]
  45. for selector in selectors:
  46. try:
  47. loc = frame.locator(selector)
  48. if loc.count() <= 0:
  49. continue
  50. target = loc.first
  51. target.click(timeout=2000)
  52. return True
  53. except Exception:
  54. continue
  55. # 在 frame 内做 open-shadow 递归查找(closed shadow 无法直接访问)
  56. try:
  57. clicked = frame.evaluate(
  58. """
  59. () => {
  60. const selectors = [
  61. "input[type='checkbox']",
  62. "[role='checkbox']",
  63. "label.ctp-checkbox-label",
  64. "div.ctp-checkbox-label",
  65. "label[for*='cf']"
  66. ];
  67. const seen = new WeakSet();
  68. const stack = [document];
  69. while (stack.length) {
  70. const root = stack.pop();
  71. if (!root || seen.has(root)) continue;
  72. seen.add(root);
  73. for (const sel of selectors) {
  74. const hit = root.querySelector(sel);
  75. if (hit) {
  76. hit.click();
  77. return true;
  78. }
  79. }
  80. const nodes = root.querySelectorAll ? root.querySelectorAll("*") : [];
  81. for (const node of nodes) {
  82. if (node.shadowRoot) stack.push(node.shadowRoot);
  83. }
  84. }
  85. return false;
  86. }
  87. """
  88. )
  89. return bool(clicked)
  90. except Exception:
  91. return False
  92. def _click_challenge_iframe_center(self) -> bool:
  93. page = self._normalize_page()
  94. for frame in page.frames:
  95. if not self._is_challenge_frame(frame):
  96. continue
  97. try:
  98. frame_el = frame.frame_element()
  99. box = frame_el.bounding_box()
  100. if not box:
  101. continue
  102. # 人类化一点:点击中心附近随机偏移,避免固定坐标
  103. cx = box["x"] + box["width"] * (0.5 + random.uniform(-0.08, 0.08))
  104. cy = box["y"] + box["height"] * (0.5 + random.uniform(-0.08, 0.08))
  105. page.mouse.move(cx, cy, steps=10)
  106. time.sleep(random.uniform(0.15, 0.45))
  107. page.mouse.click(cx, cy, delay=random.randint(50, 180))
  108. return True
  109. except Exception:
  110. continue
  111. return False
  112. def click_verification_button(self, _is_dfs=False):
  113. try:
  114. page = self._normalize_page()
  115. for frame in page.frames:
  116. if not self._is_challenge_frame(frame):
  117. continue
  118. if self._click_checkbox_in_frame(frame):
  119. self.log_message("Challenge interaction succeeded by frame selector/evaluate.")
  120. time.sleep(1)
  121. return
  122. if self._click_challenge_iframe_center():
  123. self.log_message("Challenge interaction succeeded by iframe center click.")
  124. time.sleep(1)
  125. return
  126. self.log_message("Challenge click strategies exhausted.")
  127. except Exception as e:
  128. self.log_message(f"Error clicking verification button: {e}")
  129. def is_bypassed(self):
  130. try:
  131. page = self._normalize_page()
  132. title = (page.title() or "").lower()
  133. html = (page.content() or "").lower()
  134. blocked_markers = (
  135. "just a moment",
  136. "请稍候",
  137. "checking your browser",
  138. "cf-challenge",
  139. "please complete the captcha",
  140. )
  141. return not any(m in title or m in html for m in blocked_markers)
  142. except Exception as e:
  143. self.log_message(f"Error checking page title: {e}")
  144. return False
  145. def _collect_page_state(self) -> str:
  146. """
  147. 采样当前页面状态,帮助定位卡在哪一轮挑战。
  148. """
  149. try:
  150. page = self._normalize_page()
  151. title = page.title()
  152. url = page.url
  153. challenge_type = self._determine_challenge_type()
  154. challenge_frames = 0
  155. for frame in page.frames:
  156. if self._is_challenge_frame(frame):
  157. challenge_frames += 1
  158. return (
  159. f"title={title!r}, url={url!r}, challenge_type={challenge_type}, "
  160. f"challenge_frames={challenge_frames}"
  161. )
  162. except Exception as e:
  163. return f"state_collect_error={e}"
  164. def _collect_state_signature(self):
  165. try:
  166. page = self._normalize_page()
  167. title = (page.title() or "").lower()
  168. url = (page.url or "").lower()
  169. challenge_type = self._determine_challenge_type()
  170. challenge_frames = 0
  171. for frame in page.frames:
  172. if self._is_challenge_frame(frame):
  173. challenge_frames += 1
  174. return (challenge_type, challenge_frames, title[:80], url[:120])
  175. except Exception:
  176. return ("unknown", -1, "", "")
  177. def bypass(self, max_retry=5):
  178. for i in range(max_retry):
  179. if self.is_bypassed():
  180. return True
  181. sig_before = self._collect_state_signature()
  182. state_before = self._collect_page_state()
  183. self.log_message(
  184. f"Verification page detected. try={i + 1}/{max_retry}, before_click: {state_before}"
  185. )
  186. self.click_verification_button(False)
  187. # 点击后短暂等待,再次检查是否通过
  188. time.sleep(1.2)
  189. if self.is_bypassed():
  190. self.log_message("Bypass success after click.")
  191. return True
  192. sig_after = self._collect_state_signature()
  193. no_transition = sig_before == sig_after
  194. if no_transition:
  195. self.log_message("No challenge state transition detected after click.")
  196. # 当页面状态完全不变时,做一次轻量刷新,触发 challenge 重新渲染
  197. try:
  198. page = self._normalize_page()
  199. page.reload(wait_until="domcontentloaded")
  200. self.log_message("Page reloaded to retrigger challenge rendering.")
  201. time.sleep(1.5)
  202. if self.is_bypassed():
  203. self.log_message("Bypass success after reload.")
  204. return True
  205. except Exception as reload_err:
  206. self.log_message(f"Reload failed: {reload_err}")
  207. # 递增退避,降低持续高频点击导致的风控风险
  208. wait_seconds = min(2 + i, 6)
  209. state_after = self._collect_page_state()
  210. self.log_message(
  211. f"Bypass not yet complete, sleeping {wait_seconds}s, after_click: {state_after}"
  212. )
  213. time.sleep(wait_seconds)
  214. final_ok = self.is_bypassed()
  215. if not final_ok:
  216. self.log_message(f"Bypass failed after retries. final_state: {self._collect_page_state()}")
  217. return final_ok