# from DrissionPage import ChromiumPage # # 将上面的代码保存在 human_scroll.py,然后导入 # # from human_scroll import HumanScroll, ScrollPosition # page = ChromiumPage() # page.get('https://example.com') # 打开一个有很长滚动条的网页 # # 初始化拟人化滚动引擎 # scroll_engine = HumanScroll(page) # # 1. 拟人化滚动到底部(它会自动拆分多次滑动,中间停顿,有时还会滑过头再弹回来) # print("开始拟人化滚动到底部...") # scroll_engine.to_bottom(humanize=True) # # 2. 拟人化向上滚动 800 像素 # print("向上滚动 800 像素...") # scroll_engine.by(ScrollPosition.UP, 800, humanize=True) # # 3. 如果不需要拟人化,也可以作为普通的平滑滚动工具 # print("瞬间回到顶部...") # scroll_engine.to_top(smooth=True, humanize=False) import time import random from enum import Enum from dataclasses import dataclass from typing import Optional, Tuple from DrissionPage import ChromiumPage from math_utils import CubicBezier # ========================================== # 1. 枚举与配置类 # ========================================== class ScrollPosition(Enum): UP = "UP" DOWN = "DOWN" LEFT = "LEFT" RIGHT = "RIGHT" @dataclass(frozen=True) class ScrollTimingConfig: """真实滚动物理效果配置参数""" min_duration: float = 0.5 max_duration: float = 1.5 bezier_points: Tuple[float, float, float, float] = (0.645, 0.045, 0.355, 1.0) frame_interval: float = 0.012 delta_jitter: int = 3 micro_pause_probability: float = 0.05 micro_pause_min: float = 0.02 micro_pause_max: float = 0.05 overshoot_probability: float = 0.15 overshoot_factor_min: float = 1.02 overshoot_factor_max: float = 1.08 # ========================================== # 3. DrissionPage 拟人化滚动核心类 # ========================================== class HumanScroll: """ 基于 DrissionPage 的网页拟人化滚动 API。 """ def __init__(self, page: ChromiumPage, timing: Optional[ScrollTimingConfig] = None): """ 初始化 :param page: DrissionPage 的 page 对象 :param timing: 滚动配置参数 """ self._page = page self._timing = timing or ScrollTimingConfig() def by(self, position: ScrollPosition, distance: int | float, smooth: bool = True, humanize: bool = False): """按指定方向和距离滚动""" if humanize: self._scroll_humanized(position, distance) return axis, scroll_distance = self._get_axis_and_distance(position, distance) behavior = 'smooth' if smooth else 'auto' script = f"window.scrollBy({{ {axis}: {scroll_distance}, behavior: '{behavior}' }});" self._page.run_js(script) def scroll_to_element(self, ele, humanize: bool = True): """将元素拟人化滚动到视口中央偏上 (符合人类阅读习惯)""" # 获取视口高度 viewport_height = float(self._page.run_js("return window.innerHeight || 600;")) try: # 获取元素当前相对于屏幕顶部的坐标 (getBoundingClientRect) rect = ele.run_js("return this.getBoundingClientRect();") ele_y = rect.get('top', 0) except Exception: return # 找不到元素 # 计算目标:让元素处于屏幕 40% 的位置 target_y = viewport_height * 0.4 distance = ele_y - target_y # 如果距离小于 50 像素(已经在视野舒适区了),就不再滚动 if abs(distance) < 50: return # 根据正负距离决定向上还是向下滚动 if distance > 0: self.by(ScrollPosition.DOWN, distance, humanize=humanize) else: self.by(ScrollPosition.UP, abs(distance), humanize=humanize) def to_top(self, smooth: bool = True, humanize: bool = False): """滚动到顶部""" if humanize: self._scroll_to_end_humanized(ScrollPosition.UP) return behavior = 'smooth' if smooth else 'auto' script = f"window.scrollTo({{ top: 0, behavior: '{behavior}' }});" self._page.run_js(script) def to_bottom(self, smooth: bool = True, humanize: bool = False): """滚动到底部""" if humanize: self._scroll_to_end_humanized(ScrollPosition.DOWN) return behavior = 'smooth' if smooth else 'auto' script = f"window.scrollTo({{ top: document.body.scrollHeight, behavior: '{behavior}' }});" self._page.run_js(script) def _scroll_to_end_humanized(self, position: ScrollPosition): """使用多次人类停顿习惯,一直滚动到底部或顶部""" max_flick_distance = random.uniform(600, 1200) min_remaining_threshold = 30 min_stuck_threshold = 5 min_flick_distance = 100 last_remaining = float('inf') stuck_counter = 0 max_stuck_attempts = 10 while True: if position == ScrollPosition.DOWN: remaining = self._get_remaining_scroll_to_bottom() else: remaining = self._get_current_scroll_y() if remaining <= min_remaining_threshold: break # 卡死检测(防止无限滚动页面或者高度计算异常) has_progressed = abs(remaining - last_remaining) >= min_stuck_threshold if has_progressed: stuck_counter = 0 else: stuck_counter += 1 if stuck_counter >= max_stuck_attempts: break last_remaining = remaining flick_distance = min(remaining, max_flick_distance) if flick_distance < min_flick_distance and remaining > min_flick_distance: flick_distance = min_flick_distance self._scroll_humanized(position, flick_distance) # 每次大幅度滑动后,稍微停顿阅读 pause = random.uniform(0.05, 0.15) time.sleep(pause) max_flick_distance = random.uniform(600, 1200) def _scroll_humanized(self, position: ScrollPosition, target_distance: float): """执行单次带有真实物理引擎效果的滚动""" is_vertical = position in {ScrollPosition.UP, ScrollPosition.DOWN} direction = -1 if position in {ScrollPosition.UP, ScrollPosition.LEFT} else 1 effective_distance = self._calculate_effective_distance(target_distance) duration = self._calculate_duration(effective_distance) scrolled_so_far = self._perform_scroll_loop( effective_distance, duration, is_vertical, direction ) # 模拟人类手滑导致滚动过头(Overshoot),再往回矫正一点 if effective_distance > target_distance and scrolled_so_far > target_distance: correction_distance = scrolled_so_far - target_distance correction_direction = -direction time.sleep(random.uniform(0.1, 0.2)) self._scroll_correction( is_vertical=is_vertical, direction=correction_direction, distance=correction_distance, ) def _perform_scroll_loop( self, effective_distance: float, duration: float, is_vertical: bool, direction: int ) -> float: """主滚动循环,通过底层 CDP 下发 MouseWheel 事件""" timing = self._timing bezier = CubicBezier(*timing.bezier_points) start_time = time.perf_counter() current_time = 0.0 scrolled_so_far = 0.0 while current_time < duration: now = time.perf_counter() current_time = now - start_time if current_time >= duration: break progress = current_time / duration eased_progress = bezier.solve(progress) target_pos = effective_distance * eased_progress delta = target_pos - scrolled_so_far # 加上手部微抖动带来的像素误差 jitter = random.randint(-timing.delta_jitter, timing.delta_jitter) delta += jitter delta = max(delta, 0) if delta >= 1: self._dispatch_scroll_event( delta_x=0 if is_vertical else int(delta * direction), delta_y=int(delta * direction) if is_vertical else 0, ) scrolled_so_far += delta # 帧延迟 + 极小的误差 frame_delay = timing.frame_interval + random.uniform(-0.002, 0.002) time.sleep(frame_delay) # 小概率触发手部微停顿 (浏览时卡住了) if random.random() < timing.micro_pause_probability: pause_duration = random.uniform(timing.micro_pause_min, timing.micro_pause_max) time.sleep(pause_duration) # 扣除停顿时间,保证运动总时间不变 start_time += pause_duration return scrolled_so_far def _calculate_effective_distance(self, target_distance: float) -> float: timing = self._timing should_overshoot = random.random() < timing.overshoot_probability overshoot_factor = ( random.uniform(timing.overshoot_factor_min, timing.overshoot_factor_max) if should_overshoot else 1.0 ) return target_distance * overshoot_factor def _calculate_duration(self, distance: float) -> float: timing = self._timing base_duration = random.uniform(timing.min_duration, timing.max_duration) duration = base_duration * (1 + 0.2 * (distance / 1000)) return min(duration, 3.0) def _scroll_correction(self, is_vertical: bool, direction: int, distance: float): """修正滚动 (过度回弹)""" timing = self._timing scrolled = 0.0 min_correction_velocity = (distance * 0.15) / timing.frame_interval correction_velocity = random.uniform( max(200, min_correction_velocity), max(400, min_correction_velocity * 1.5) ) while scrolled < distance: frame_delta = correction_velocity * timing.frame_interval frame_delta = min(frame_delta, distance - scrolled) self._dispatch_scroll_event( delta_x=0 if is_vertical else int(frame_delta * direction), delta_y=int(frame_delta * direction) if is_vertical else 0, ) scrolled += frame_delta correction_velocity *= 0.85 # 阻尼减速 time.sleep(timing.frame_interval) def _dispatch_scroll_event(self, delta_x: int, delta_y: int): """ [核心反爬突破点] 调用 DrissionPage 的 CDP (Chrome DevTools Protocol) 发送原生的鼠标滚轮事件。 这会触发网页真实的 `wheel` 事件,完全避开了 window.scrollBy 被检测的风险。 """ viewport = self._get_viewport_center() # 参数必须与 CDP 协议中的 Input.dispatchMouseEvent 一致 self._page.driver.run( 'Input.dispatchMouseEvent', type='mouseWheel', x=viewport[0], y=viewport[1], deltaX=delta_x, deltaY=delta_y ) def _get_viewport_center(self) -> Tuple[int, int]: """获取浏览器视口中心坐标""" script = "return[window.innerWidth / 2 || 400, window.innerHeight / 2 || 300];" try: res = self._page.run_js(script) return int(res[0]), int(res[1]) except Exception: return 400, 300 def _get_current_scroll_y(self) -> float: """获取当前 Y 轴滚动距离""" return float(self._page.run_js("return window.scrollY || window.pageYOffset || 0;")) def _get_remaining_scroll_to_bottom(self) -> float: """获取距离底部的剩余距离""" script = """ return Math.max(0, document.documentElement.scrollHeight - window.scrollY - window.innerHeight ); """ return float(self._page.run_js(script)) @staticmethod def _get_axis_and_distance(position: ScrollPosition, distance: int | float) -> Tuple[str, float]: if position in {ScrollPosition.UP, ScrollPosition.DOWN}: return 'top', -distance if position == ScrollPosition.UP else float(distance) return 'left', -distance if position == ScrollPosition.LEFT else float(distance)