# import time # import json # from DrissionPage import ChromiumPage # from network_interceptor import DrissionFetchInterceptor # 导入刚才封装的工具类 # # 初始化浏览器 # page = ChromiumPage() # # 实例化拦截器 # interceptor = DrissionFetchInterceptor(page) # # ========================================== # # 场景 1:动态修改 Request Body # # ========================================== # def custom_request_handler(url, original_post_data): # print(f"原始请求体: {original_post_data}") # # 你可以对 original_post_data 解析并修改,或者直接返回全新的数据 # # 假设接口需要 JSON,你可以直接返回一个 Python 字典,工具类会自动将其转为 JSON 和 Base64 # new_request_data = { # "username": "admin", # "password": "modified_password_by_tool", # "timestamp": time.time() # } # return new_request_data # # 告诉拦截器,凡是包含 "api/submit" 的请求,发出去前先经过 custom_request_handler 处理 # interceptor.intercept_request("api/submit", custom_request_handler) # # ========================================== # # 场景 2:动态修改 / 伪造 Response Body # # ========================================== # def custom_response_handler(url, original_body): # print(f"收到服务器原始响应: {original_body[:100]}...") # 打印前100个字符 # # 我们基于原有的响应数据,注入一些自己想要伪造的数据 # try: # data = json.loads(original_body) # data['message'] = "这是被DrissionPage拦截并篡改的Mock数据!" # data['vip_status'] = True # return data # 直接返回字典即可 # except json.JSONDecodeError: # # 如果不是JSON,直接返回一个新字符串 # return '{"code": 200, "message": "全量替换的Mock数据"}' # # 告诉拦截器,凡是包含 "api/data" 的响应,接收前先经过 custom_response_handler 处理 # interceptor.intercept_response("api/data", custom_response_handler) # # ========================================== # # 启动拦截并开始业务测试 # # ========================================== # interceptor.start() # print("拦截器已生效,开始访问页面...") # page.get('https://example.com') # 替换为测试的目标网址 # # 模拟业务等待 # time.sleep(100) # # 如果不需要拦截了,可以调用 stop (可选) # # interceptor.stop() import base64 import json from typing import Callable, Union, Dict, Any class DrissionFetchInterceptor: """ DrissionPage Fetch 请求/响应拦截修改器 基于 Chrome DevTools Protocol (CDP) 的 Fetch 域 """ def __init__(self, page): self.page = page self.patterns = [] self._request_handlers = {} self._response_handlers = {} self._is_running = False def intercept_request(self, url_keyword: str, handler: Callable): """ 添加拦截 Request Body 的规则 :param url_keyword: URL中包含的关键字 (例如 'api/submit') :param handler: 回调函数,接收 (url, original_post_data),需返回新的 post_data 字符串或字典 """ self.patterns.append({ 'urlPattern': f'*{url_keyword}*', 'requestStage': 'Request' }) self._request_handlers[url_keyword] = handler def intercept_response(self, url_keyword: str, handler: Callable): """ 添加拦截 Response Body 的规则 :param url_keyword: URL中包含的关键字 (例如 'api/data') :param handler: 回调函数,接收 (url, original_body),需返回新的 body 字符串或字典 """ self.patterns.append({ 'urlPattern': f'*{url_keyword}*', 'requestStage': 'Response' }) self._response_handlers[url_keyword] = handler def start(self): """启动拦截器""" if not self.patterns: print("[Interceptor] 没有配置任何拦截规则。") return # 开启 Fetch 域并应用规则 self.page.run_cdp('Fetch.enable', patterns=self.patterns) # 绑定核心回调 self.page.driver.set_callback('Fetch.requestPaused', self._on_request_paused) self._is_running = True print(f"[Interceptor] 已启动,共 {len(self.patterns)} 条规则生效。") def stop(self): """停止拦截器""" if self._is_running: self.page.run_cdp('Fetch.disable') self.page.driver.set_callback('Fetch.requestPaused', None) self._is_running = False print("[Interceptor] 已停止。") def _on_request_paused(self, **kwargs): """底层的 CDP 暂停事件路由器""" request_id = kwargs.get('requestId') request = kwargs.get('request', {}) url = request.get('url') response_status = kwargs.get('responseStatusCode') # === 阶段 1: 拦截并修改 Response (服务器返回后) === if response_status: for keyword, handler in self._response_handlers.items(): if keyword in url: self._handle_response_modification(kwargs, handler) return # 没匹配上,放行 self.page.run_cdp('Fetch.continueRequest', requestId=request_id) return # === 阶段 2: 拦截并修改 Request (发往服务器前) === for keyword, handler in self._request_handlers.items(): if keyword in url: self._handle_request_modification(kwargs, handler) return # 没匹配上,放行 self.page.run_cdp('Fetch.continueRequest', requestId=request_id) def _handle_request_modification(self, kwargs: Dict, handler: Callable): """处理 Request 的修改逻辑""" request_id = kwargs.get('requestId') request = kwargs.get('request', {}) url = request.get('url') original_post_data = request.get('postData', '') try: # 调用用户自定义的处理函数 new_data = handler(url, original_post_data) # 如果返回的是字典,自动转成 JSON 字符串 if isinstance(new_data, dict): new_data = json.dumps(new_data) if new_data is not None: encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8') self.page.run_cdp('Fetch.continueRequest', requestId=request_id, postData=encoded_body) print(f"[Interceptor] 成功修改 Request Body -> {url}") return except Exception as e: print(f"[Interceptor] 修改 Request 时发生错误: {e}") # 如果发生异常或未返回新数据,兜底原样放行,防止浏览器卡死 self.page.run_cdp('Fetch.continueRequest', requestId=request_id) def _handle_response_modification(self, kwargs: Dict, handler: Callable): """处理 Response 的修改逻辑""" request_id = kwargs.get('requestId') url = kwargs.get('request', {}).get('url') response_status = kwargs.get('responseStatusCode', 200) # CDP 中 headers 的格式是一个字典列表 [{'name': 'Content-Type', 'value': '...'}, ...] headers = kwargs.get('responseHeaders', []) original_body = "" try: # 尝试获取原始 Response Body res = self.page.run_cdp('Fetch.getResponseBody', requestId=request_id) original_body = res.get('body', '') if res.get('base64Encoded'): original_body = base64.b64decode(original_body).decode('utf-8') except Exception: pass # 有些请求(如图片/流)可能获取不到body try: # 调用用户自定义的处理函数 new_data = handler(url, original_body) # 如果返回的是字典,自动转成 JSON 字符串 if isinstance(new_data, dict): new_data = json.dumps(new_data) if new_data is not None: encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8') # 修改响应体必须使用 Fetch.fulfillRequest self.page.run_cdp('Fetch.fulfillRequest', requestId=request_id, responseCode=response_status, responseHeaders=headers, body=encoded_body) print(f"[Interceptor] 成功修改 Response Body -> {url}") return except Exception as e: print(f"[Interceptor] 修改 Response 时发生错误: {e}") # 兜底:如果报错,原样放行请求 self.page.run_cdp('Fetch.continueRequest', requestId=request_id)