scroll.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. # from DrissionPage import ChromiumPage
  2. # # 将上面的代码保存在 human_scroll.py,然后导入
  3. # # from human_scroll import HumanScroll, ScrollPosition
  4. # page = ChromiumPage()
  5. # page.get('https://example.com') # 打开一个有很长滚动条的网页
  6. # # 初始化拟人化滚动引擎
  7. # scroll_engine = HumanScroll(page)
  8. # # 1. 拟人化滚动到底部(它会自动拆分多次滑动,中间停顿,有时还会滑过头再弹回来)
  9. # print("开始拟人化滚动到底部...")
  10. # scroll_engine.to_bottom(humanize=True)
  11. # # 2. 拟人化向上滚动 800 像素
  12. # print("向上滚动 800 像素...")
  13. # scroll_engine.by(ScrollPosition.UP, 800, humanize=True)
  14. # # 3. 如果不需要拟人化,也可以作为普通的平滑滚动工具
  15. # print("瞬间回到顶部...")
  16. # scroll_engine.to_top(smooth=True, humanize=False)
  17. import time
  18. import random
  19. from enum import Enum
  20. from dataclasses import dataclass
  21. from typing import Optional, Tuple
  22. from DrissionPage import ChromiumPage
  23. from math_utils import CubicBezier
  24. # ==========================================
  25. # 1. 枚举与配置类
  26. # ==========================================
  27. class ScrollPosition(Enum):
  28. UP = "UP"
  29. DOWN = "DOWN"
  30. LEFT = "LEFT"
  31. RIGHT = "RIGHT"
  32. @dataclass(frozen=True)
  33. class ScrollTimingConfig:
  34. """真实滚动物理效果配置参数"""
  35. min_duration: float = 0.5
  36. max_duration: float = 1.5
  37. bezier_points: Tuple[float, float, float, float] = (0.645, 0.045, 0.355, 1.0)
  38. frame_interval: float = 0.012
  39. delta_jitter: int = 3
  40. micro_pause_probability: float = 0.05
  41. micro_pause_min: float = 0.02
  42. micro_pause_max: float = 0.05
  43. overshoot_probability: float = 0.15
  44. overshoot_factor_min: float = 1.02
  45. overshoot_factor_max: float = 1.08
  46. # ==========================================
  47. # 3. DrissionPage 拟人化滚动核心类
  48. # ==========================================
  49. class HumanScroll:
  50. """
  51. 基于 DrissionPage 的网页拟人化滚动 API。
  52. """
  53. def __init__(self, page: ChromiumPage, timing: Optional[ScrollTimingConfig] = None):
  54. """
  55. 初始化
  56. :param page: DrissionPage 的 page 对象
  57. :param timing: 滚动配置参数
  58. """
  59. self._page = page
  60. self._timing = timing or ScrollTimingConfig()
  61. def by(self, position: ScrollPosition, distance: int | float, smooth: bool = True, humanize: bool = False):
  62. """按指定方向和距离滚动"""
  63. if humanize:
  64. self._scroll_humanized(position, distance)
  65. return
  66. axis, scroll_distance = self._get_axis_and_distance(position, distance)
  67. behavior = 'smooth' if smooth else 'auto'
  68. script = f"window.scrollBy({{ {axis}: {scroll_distance}, behavior: '{behavior}' }});"
  69. self._page.run_js(script)
  70. def scroll_to_element(self, ele, humanize: bool = True):
  71. """将元素拟人化滚动到视口中央偏上 (符合人类阅读习惯)"""
  72. # 获取视口高度
  73. viewport_height = float(self._page.run_js("return window.innerHeight || 600;"))
  74. try:
  75. # 获取元素当前相对于屏幕顶部的坐标 (getBoundingClientRect)
  76. rect = ele.run_js("return this.getBoundingClientRect();")
  77. ele_y = rect.get('top', 0)
  78. except Exception:
  79. return # 找不到元素
  80. # 计算目标:让元素处于屏幕 40% 的位置
  81. target_y = viewport_height * 0.4
  82. distance = ele_y - target_y
  83. # 如果距离小于 50 像素(已经在视野舒适区了),就不再滚动
  84. if abs(distance) < 50:
  85. return
  86. # 根据正负距离决定向上还是向下滚动
  87. if distance > 0:
  88. self.by(ScrollPosition.DOWN, distance, humanize=humanize)
  89. else:
  90. self.by(ScrollPosition.UP, abs(distance), humanize=humanize)
  91. def to_top(self, smooth: bool = True, humanize: bool = False):
  92. """滚动到顶部"""
  93. if humanize:
  94. self._scroll_to_end_humanized(ScrollPosition.UP)
  95. return
  96. behavior = 'smooth' if smooth else 'auto'
  97. script = f"window.scrollTo({{ top: 0, behavior: '{behavior}' }});"
  98. self._page.run_js(script)
  99. def to_bottom(self, smooth: bool = True, humanize: bool = False):
  100. """滚动到底部"""
  101. if humanize:
  102. self._scroll_to_end_humanized(ScrollPosition.DOWN)
  103. return
  104. behavior = 'smooth' if smooth else 'auto'
  105. script = f"window.scrollTo({{ top: document.body.scrollHeight, behavior: '{behavior}' }});"
  106. self._page.run_js(script)
  107. def _scroll_to_end_humanized(self, position: ScrollPosition):
  108. """使用多次人类停顿习惯,一直滚动到底部或顶部"""
  109. max_flick_distance = random.uniform(600, 1200)
  110. min_remaining_threshold = 30
  111. min_stuck_threshold = 5
  112. min_flick_distance = 100
  113. last_remaining = float('inf')
  114. stuck_counter = 0
  115. max_stuck_attempts = 10
  116. while True:
  117. if position == ScrollPosition.DOWN:
  118. remaining = self._get_remaining_scroll_to_bottom()
  119. else:
  120. remaining = self._get_current_scroll_y()
  121. if remaining <= min_remaining_threshold:
  122. break
  123. # 卡死检测(防止无限滚动页面或者高度计算异常)
  124. has_progressed = abs(remaining - last_remaining) >= min_stuck_threshold
  125. if has_progressed:
  126. stuck_counter = 0
  127. else:
  128. stuck_counter += 1
  129. if stuck_counter >= max_stuck_attempts:
  130. break
  131. last_remaining = remaining
  132. flick_distance = min(remaining, max_flick_distance)
  133. if flick_distance < min_flick_distance and remaining > min_flick_distance:
  134. flick_distance = min_flick_distance
  135. self._scroll_humanized(position, flick_distance)
  136. # 每次大幅度滑动后,稍微停顿阅读
  137. pause = random.uniform(0.05, 0.15)
  138. time.sleep(pause)
  139. max_flick_distance = random.uniform(600, 1200)
  140. def _scroll_humanized(self, position: ScrollPosition, target_distance: float):
  141. """执行单次带有真实物理引擎效果的滚动"""
  142. is_vertical = position in {ScrollPosition.UP, ScrollPosition.DOWN}
  143. direction = -1 if position in {ScrollPosition.UP, ScrollPosition.LEFT} else 1
  144. effective_distance = self._calculate_effective_distance(target_distance)
  145. duration = self._calculate_duration(effective_distance)
  146. scrolled_so_far = self._perform_scroll_loop(
  147. effective_distance, duration, is_vertical, direction
  148. )
  149. # 模拟人类手滑导致滚动过头(Overshoot),再往回矫正一点
  150. if effective_distance > target_distance and scrolled_so_far > target_distance:
  151. correction_distance = scrolled_so_far - target_distance
  152. correction_direction = -direction
  153. time.sleep(random.uniform(0.1, 0.2))
  154. self._scroll_correction(
  155. is_vertical=is_vertical,
  156. direction=correction_direction,
  157. distance=correction_distance,
  158. )
  159. def _perform_scroll_loop(
  160. self, effective_distance: float, duration: float, is_vertical: bool, direction: int
  161. ) -> float:
  162. """主滚动循环,通过底层 CDP 下发 MouseWheel 事件"""
  163. timing = self._timing
  164. bezier = CubicBezier(*timing.bezier_points)
  165. start_time = time.perf_counter()
  166. current_time = 0.0
  167. scrolled_so_far = 0.0
  168. while current_time < duration:
  169. now = time.perf_counter()
  170. current_time = now - start_time
  171. if current_time >= duration:
  172. break
  173. progress = current_time / duration
  174. eased_progress = bezier.solve(progress)
  175. target_pos = effective_distance * eased_progress
  176. delta = target_pos - scrolled_so_far
  177. # 加上手部微抖动带来的像素误差
  178. jitter = random.randint(-timing.delta_jitter, timing.delta_jitter)
  179. delta += jitter
  180. delta = max(delta, 0)
  181. if delta >= 1:
  182. self._dispatch_scroll_event(
  183. delta_x=0 if is_vertical else int(delta * direction),
  184. delta_y=int(delta * direction) if is_vertical else 0,
  185. )
  186. scrolled_so_far += delta
  187. # 帧延迟 + 极小的误差
  188. frame_delay = timing.frame_interval + random.uniform(-0.002, 0.002)
  189. time.sleep(frame_delay)
  190. # 小概率触发手部微停顿 (浏览时卡住了)
  191. if random.random() < timing.micro_pause_probability:
  192. pause_duration = random.uniform(timing.micro_pause_min, timing.micro_pause_max)
  193. time.sleep(pause_duration)
  194. # 扣除停顿时间,保证运动总时间不变
  195. start_time += pause_duration
  196. return scrolled_so_far
  197. def _calculate_effective_distance(self, target_distance: float) -> float:
  198. timing = self._timing
  199. should_overshoot = random.random() < timing.overshoot_probability
  200. overshoot_factor = (
  201. random.uniform(timing.overshoot_factor_min, timing.overshoot_factor_max)
  202. if should_overshoot else 1.0
  203. )
  204. return target_distance * overshoot_factor
  205. def _calculate_duration(self, distance: float) -> float:
  206. timing = self._timing
  207. base_duration = random.uniform(timing.min_duration, timing.max_duration)
  208. duration = base_duration * (1 + 0.2 * (distance / 1000))
  209. return min(duration, 3.0)
  210. def _scroll_correction(self, is_vertical: bool, direction: int, distance: float):
  211. """修正滚动 (过度回弹)"""
  212. timing = self._timing
  213. scrolled = 0.0
  214. min_correction_velocity = (distance * 0.15) / timing.frame_interval
  215. correction_velocity = random.uniform(
  216. max(200, min_correction_velocity), max(400, min_correction_velocity * 1.5)
  217. )
  218. while scrolled < distance:
  219. frame_delta = correction_velocity * timing.frame_interval
  220. frame_delta = min(frame_delta, distance - scrolled)
  221. self._dispatch_scroll_event(
  222. delta_x=0 if is_vertical else int(frame_delta * direction),
  223. delta_y=int(frame_delta * direction) if is_vertical else 0,
  224. )
  225. scrolled += frame_delta
  226. correction_velocity *= 0.85 # 阻尼减速
  227. time.sleep(timing.frame_interval)
  228. def _dispatch_scroll_event(self, delta_x: int, delta_y: int):
  229. """
  230. [核心反爬突破点]
  231. 调用 DrissionPage 的 CDP (Chrome DevTools Protocol) 发送原生的鼠标滚轮事件。
  232. 这会触发网页真实的 `wheel` 事件,完全避开了 window.scrollBy 被检测的风险。
  233. """
  234. viewport = self._get_viewport_center()
  235. # 参数必须与 CDP 协议中的 Input.dispatchMouseEvent 一致
  236. self._page.driver.run(
  237. 'Input.dispatchMouseEvent',
  238. type='mouseWheel',
  239. x=viewport[0],
  240. y=viewport[1],
  241. deltaX=delta_x,
  242. deltaY=delta_y
  243. )
  244. def _get_viewport_center(self) -> Tuple[int, int]:
  245. """获取浏览器视口中心坐标"""
  246. script = "return[window.innerWidth / 2 || 400, window.innerHeight / 2 || 300];"
  247. try:
  248. res = self._page.run_js(script)
  249. return int(res[0]), int(res[1])
  250. except Exception:
  251. return 400, 300
  252. def _get_current_scroll_y(self) -> float:
  253. """获取当前 Y 轴滚动距离"""
  254. return float(self._page.run_js("return window.scrollY || window.pageYOffset || 0;"))
  255. def _get_remaining_scroll_to_bottom(self) -> float:
  256. """获取距离底部的剩余距离"""
  257. script = """
  258. return Math.max(0,
  259. document.documentElement.scrollHeight -
  260. window.scrollY -
  261. window.innerHeight
  262. );
  263. """
  264. return float(self._page.run_js(script))
  265. @staticmethod
  266. def _get_axis_and_distance(position: ScrollPosition, distance: int | float) -> Tuple[str, float]:
  267. if position in {ScrollPosition.UP, ScrollPosition.DOWN}:
  268. return 'top', -distance if position == ScrollPosition.UP else float(distance)
  269. return 'left', -distance if position == ScrollPosition.LEFT else float(distance)