| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- # import time
- # import json
- # from DrissionPage import ChromiumPage
- # from network_interceptor import DrissionFetchInterceptor # 导入刚才封装的工具类
- # # 初始化浏览器
- # page = ChromiumPage()
- # # 实例化拦截器
- # interceptor = DrissionFetchInterceptor(page)
- # # ==========================================
- # # 场景 1:动态修改 Request Body
- # # ==========================================
- # def custom_request_handler(url, original_post_data):
- # print(f"原始请求体: {original_post_data}")
- # # 你可以对 original_post_data 解析并修改,或者直接返回全新的数据
-
- # # 假设接口需要 JSON,你可以直接返回一个 Python 字典,工具类会自动将其转为 JSON 和 Base64
- # new_request_data = {
- # "username": "admin",
- # "password": "modified_password_by_tool",
- # "timestamp": time.time()
- # }
- # return new_request_data
- # # 告诉拦截器,凡是包含 "api/submit" 的请求,发出去前先经过 custom_request_handler 处理
- # interceptor.intercept_request("api/submit", custom_request_handler)
- # # ==========================================
- # # 场景 2:动态修改 / 伪造 Response Body
- # # ==========================================
- # def custom_response_handler(url, original_body):
- # print(f"收到服务器原始响应: {original_body[:100]}...") # 打印前100个字符
-
- # # 我们基于原有的响应数据,注入一些自己想要伪造的数据
- # try:
- # data = json.loads(original_body)
- # data['message'] = "这是被DrissionPage拦截并篡改的Mock数据!"
- # data['vip_status'] = True
- # return data # 直接返回字典即可
- # except json.JSONDecodeError:
- # # 如果不是JSON,直接返回一个新字符串
- # return '{"code": 200, "message": "全量替换的Mock数据"}'
- # # 告诉拦截器,凡是包含 "api/data" 的响应,接收前先经过 custom_response_handler 处理
- # interceptor.intercept_response("api/data", custom_response_handler)
- # # ==========================================
- # # 启动拦截并开始业务测试
- # # ==========================================
- # interceptor.start()
- # print("拦截器已生效,开始访问页面...")
- # page.get('https://example.com') # 替换为测试的目标网址
- # # 模拟业务等待
- # time.sleep(100)
- # # 如果不需要拦截了,可以调用 stop (可选)
- # # interceptor.stop()
- import base64
- import json
- from typing import Callable, Union, Dict, Any
- class DrissionFetchInterceptor:
- """
- DrissionPage Fetch 请求/响应拦截修改器
- 基于 Chrome DevTools Protocol (CDP) 的 Fetch 域
- """
- def __init__(self, page):
- self.page = page
- self.patterns = []
- self._request_handlers = {}
- self._response_handlers = {}
- self._is_running = False
- def intercept_request(self, url_keyword: str, handler: Callable):
- """
- 添加拦截 Request Body 的规则
- :param url_keyword: URL中包含的关键字 (例如 'api/submit')
- :param handler: 回调函数,接收 (url, original_post_data),需返回新的 post_data 字符串或字典
- """
- self.patterns.append({
- 'urlPattern': f'*{url_keyword}*',
- 'requestStage': 'Request'
- })
- self._request_handlers[url_keyword] = handler
- def intercept_response(self, url_keyword: str, handler: Callable):
- """
- 添加拦截 Response Body 的规则
- :param url_keyword: URL中包含的关键字 (例如 'api/data')
- :param handler: 回调函数,接收 (url, original_body),需返回新的 body 字符串或字典
- """
- self.patterns.append({
- 'urlPattern': f'*{url_keyword}*',
- 'requestStage': 'Response'
- })
- self._response_handlers[url_keyword] = handler
- def start(self):
- """启动拦截器"""
- if not self.patterns:
- print("[Interceptor] 没有配置任何拦截规则。")
- return
- # 开启 Fetch 域并应用规则
- self.page.run_cdp('Fetch.enable', patterns=self.patterns)
- # 绑定核心回调
- self.page.driver.set_callback('Fetch.requestPaused', self._on_request_paused)
- self._is_running = True
- print(f"[Interceptor] 已启动,共 {len(self.patterns)} 条规则生效。")
- def stop(self):
- """停止拦截器"""
- if self._is_running:
- self.page.run_cdp('Fetch.disable')
- self.page.driver.set_callback('Fetch.requestPaused', None)
- self._is_running = False
- print("[Interceptor] 已停止。")
- def _on_request_paused(self, **kwargs):
- """底层的 CDP 暂停事件路由器"""
- request_id = kwargs.get('requestId')
- request = kwargs.get('request', {})
- url = request.get('url')
- response_status = kwargs.get('responseStatusCode')
- # === 阶段 1: 拦截并修改 Response (服务器返回后) ===
- if response_status:
- for keyword, handler in self._response_handlers.items():
- if keyword in url:
- self._handle_response_modification(kwargs, handler)
- return
-
- # 没匹配上,放行
- self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
- return
- # === 阶段 2: 拦截并修改 Request (发往服务器前) ===
- for keyword, handler in self._request_handlers.items():
- if keyword in url:
- self._handle_request_modification(kwargs, handler)
- return
-
- # 没匹配上,放行
- self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
- def _handle_request_modification(self, kwargs: Dict, handler: Callable):
- """处理 Request 的修改逻辑"""
- request_id = kwargs.get('requestId')
- request = kwargs.get('request', {})
- url = request.get('url')
- original_post_data = request.get('postData', '')
- try:
- # 调用用户自定义的处理函数
- new_data = handler(url, original_post_data)
-
- # 如果返回的是字典,自动转成 JSON 字符串
- if isinstance(new_data, dict):
- new_data = json.dumps(new_data)
- if new_data is not None:
- encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8')
- self.page.run_cdp('Fetch.continueRequest', requestId=request_id, postData=encoded_body)
- print(f"[Interceptor] 成功修改 Request Body -> {url}")
- return
- except Exception as e:
- print(f"[Interceptor] 修改 Request 时发生错误: {e}")
-
- # 如果发生异常或未返回新数据,兜底原样放行,防止浏览器卡死
- self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
- def _handle_response_modification(self, kwargs: Dict, handler: Callable):
- """处理 Response 的修改逻辑"""
- request_id = kwargs.get('requestId')
- url = kwargs.get('request', {}).get('url')
- response_status = kwargs.get('responseStatusCode', 200)
- # CDP 中 headers 的格式是一个字典列表 [{'name': 'Content-Type', 'value': '...'}, ...]
- headers = kwargs.get('responseHeaders', [])
- original_body = ""
- try:
- # 尝试获取原始 Response Body
- res = self.page.run_cdp('Fetch.getResponseBody', requestId=request_id)
- original_body = res.get('body', '')
- if res.get('base64Encoded'):
- original_body = base64.b64decode(original_body).decode('utf-8')
- except Exception:
- pass # 有些请求(如图片/流)可能获取不到body
- try:
- # 调用用户自定义的处理函数
- new_data = handler(url, original_body)
- # 如果返回的是字典,自动转成 JSON 字符串
- if isinstance(new_data, dict):
- new_data = json.dumps(new_data)
- if new_data is not None:
- encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8')
- # 修改响应体必须使用 Fetch.fulfillRequest
- self.page.run_cdp('Fetch.fulfillRequest',
- requestId=request_id,
- responseCode=response_status,
- responseHeaders=headers,
- body=encoded_body)
- print(f"[Interceptor] 成功修改 Response Body -> {url}")
- return
- except Exception as e:
- print(f"[Interceptor] 修改 Response 时发生错误: {e}")
- # 兜底:如果报错,原样放行请求
- self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
|