network_interceptor.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # import time
  2. # import json
  3. # from DrissionPage import ChromiumPage
  4. # from network_interceptor import DrissionFetchInterceptor # 导入刚才封装的工具类
  5. # # 初始化浏览器
  6. # page = ChromiumPage()
  7. # # 实例化拦截器
  8. # interceptor = DrissionFetchInterceptor(page)
  9. # # ==========================================
  10. # # 场景 1:动态修改 Request Body
  11. # # ==========================================
  12. # def custom_request_handler(url, original_post_data):
  13. # print(f"原始请求体: {original_post_data}")
  14. # # 你可以对 original_post_data 解析并修改,或者直接返回全新的数据
  15. # # 假设接口需要 JSON,你可以直接返回一个 Python 字典,工具类会自动将其转为 JSON 和 Base64
  16. # new_request_data = {
  17. # "username": "admin",
  18. # "password": "modified_password_by_tool",
  19. # "timestamp": time.time()
  20. # }
  21. # return new_request_data
  22. # # 告诉拦截器,凡是包含 "api/submit" 的请求,发出去前先经过 custom_request_handler 处理
  23. # interceptor.intercept_request("api/submit", custom_request_handler)
  24. # # ==========================================
  25. # # 场景 2:动态修改 / 伪造 Response Body
  26. # # ==========================================
  27. # def custom_response_handler(url, original_body):
  28. # print(f"收到服务器原始响应: {original_body[:100]}...") # 打印前100个字符
  29. # # 我们基于原有的响应数据,注入一些自己想要伪造的数据
  30. # try:
  31. # data = json.loads(original_body)
  32. # data['message'] = "这是被DrissionPage拦截并篡改的Mock数据!"
  33. # data['vip_status'] = True
  34. # return data # 直接返回字典即可
  35. # except json.JSONDecodeError:
  36. # # 如果不是JSON,直接返回一个新字符串
  37. # return '{"code": 200, "message": "全量替换的Mock数据"}'
  38. # # 告诉拦截器,凡是包含 "api/data" 的响应,接收前先经过 custom_response_handler 处理
  39. # interceptor.intercept_response("api/data", custom_response_handler)
  40. # # ==========================================
  41. # # 启动拦截并开始业务测试
  42. # # ==========================================
  43. # interceptor.start()
  44. # print("拦截器已生效,开始访问页面...")
  45. # page.get('https://example.com') # 替换为测试的目标网址
  46. # # 模拟业务等待
  47. # time.sleep(100)
  48. # # 如果不需要拦截了,可以调用 stop (可选)
  49. # # interceptor.stop()
  50. import base64
  51. import json
  52. from typing import Callable, Union, Dict, Any
  53. class DrissionFetchInterceptor:
  54. """
  55. DrissionPage Fetch 请求/响应拦截修改器
  56. 基于 Chrome DevTools Protocol (CDP) 的 Fetch 域
  57. """
  58. def __init__(self, page):
  59. self.page = page
  60. self.patterns = []
  61. self._request_handlers = {}
  62. self._response_handlers = {}
  63. self._is_running = False
  64. def intercept_request(self, url_keyword: str, handler: Callable):
  65. """
  66. 添加拦截 Request Body 的规则
  67. :param url_keyword: URL中包含的关键字 (例如 'api/submit')
  68. :param handler: 回调函数,接收 (url, original_post_data),需返回新的 post_data 字符串或字典
  69. """
  70. self.patterns.append({
  71. 'urlPattern': f'*{url_keyword}*',
  72. 'requestStage': 'Request'
  73. })
  74. self._request_handlers[url_keyword] = handler
  75. def intercept_response(self, url_keyword: str, handler: Callable):
  76. """
  77. 添加拦截 Response Body 的规则
  78. :param url_keyword: URL中包含的关键字 (例如 'api/data')
  79. :param handler: 回调函数,接收 (url, original_body),需返回新的 body 字符串或字典
  80. """
  81. self.patterns.append({
  82. 'urlPattern': f'*{url_keyword}*',
  83. 'requestStage': 'Response'
  84. })
  85. self._response_handlers[url_keyword] = handler
  86. def start(self):
  87. """启动拦截器"""
  88. if not self.patterns:
  89. print("[Interceptor] 没有配置任何拦截规则。")
  90. return
  91. # 开启 Fetch 域并应用规则
  92. self.page.run_cdp('Fetch.enable', patterns=self.patterns)
  93. # 绑定核心回调
  94. self.page.driver.set_callback('Fetch.requestPaused', self._on_request_paused)
  95. self._is_running = True
  96. print(f"[Interceptor] 已启动,共 {len(self.patterns)} 条规则生效。")
  97. def stop(self):
  98. """停止拦截器"""
  99. if self._is_running:
  100. self.page.run_cdp('Fetch.disable')
  101. self.page.driver.set_callback('Fetch.requestPaused', None)
  102. self._is_running = False
  103. print("[Interceptor] 已停止。")
  104. def _on_request_paused(self, **kwargs):
  105. """底层的 CDP 暂停事件路由器"""
  106. request_id = kwargs.get('requestId')
  107. request = kwargs.get('request', {})
  108. url = request.get('url')
  109. response_status = kwargs.get('responseStatusCode')
  110. # === 阶段 1: 拦截并修改 Response (服务器返回后) ===
  111. if response_status:
  112. for keyword, handler in self._response_handlers.items():
  113. if keyword in url:
  114. self._handle_response_modification(kwargs, handler)
  115. return
  116. # 没匹配上,放行
  117. self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
  118. return
  119. # === 阶段 2: 拦截并修改 Request (发往服务器前) ===
  120. for keyword, handler in self._request_handlers.items():
  121. if keyword in url:
  122. self._handle_request_modification(kwargs, handler)
  123. return
  124. # 没匹配上,放行
  125. self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
  126. def _handle_request_modification(self, kwargs: Dict, handler: Callable):
  127. """处理 Request 的修改逻辑"""
  128. request_id = kwargs.get('requestId')
  129. request = kwargs.get('request', {})
  130. url = request.get('url')
  131. original_post_data = request.get('postData', '')
  132. try:
  133. # 调用用户自定义的处理函数
  134. new_data = handler(url, original_post_data)
  135. # 如果返回的是字典,自动转成 JSON 字符串
  136. if isinstance(new_data, dict):
  137. new_data = json.dumps(new_data)
  138. if new_data is not None:
  139. encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8')
  140. self.page.run_cdp('Fetch.continueRequest', requestId=request_id, postData=encoded_body)
  141. print(f"[Interceptor] 成功修改 Request Body -> {url}")
  142. return
  143. except Exception as e:
  144. print(f"[Interceptor] 修改 Request 时发生错误: {e}")
  145. # 如果发生异常或未返回新数据,兜底原样放行,防止浏览器卡死
  146. self.page.run_cdp('Fetch.continueRequest', requestId=request_id)
  147. def _handle_response_modification(self, kwargs: Dict, handler: Callable):
  148. """处理 Response 的修改逻辑"""
  149. request_id = kwargs.get('requestId')
  150. url = kwargs.get('request', {}).get('url')
  151. response_status = kwargs.get('responseStatusCode', 200)
  152. # CDP 中 headers 的格式是一个字典列表 [{'name': 'Content-Type', 'value': '...'}, ...]
  153. headers = kwargs.get('responseHeaders', [])
  154. original_body = ""
  155. try:
  156. # 尝试获取原始 Response Body
  157. res = self.page.run_cdp('Fetch.getResponseBody', requestId=request_id)
  158. original_body = res.get('body', '')
  159. if res.get('base64Encoded'):
  160. original_body = base64.b64decode(original_body).decode('utf-8')
  161. except Exception:
  162. pass # 有些请求(如图片/流)可能获取不到body
  163. try:
  164. # 调用用户自定义的处理函数
  165. new_data = handler(url, original_body)
  166. # 如果返回的是字典,自动转成 JSON 字符串
  167. if isinstance(new_data, dict):
  168. new_data = json.dumps(new_data)
  169. if new_data is not None:
  170. encoded_body = base64.b64encode(str(new_data).encode('utf-8')).decode('utf-8')
  171. # 修改响应体必须使用 Fetch.fulfillRequest
  172. self.page.run_cdp('Fetch.fulfillRequest',
  173. requestId=request_id,
  174. responseCode=response_status,
  175. responseHeaders=headers,
  176. body=encoded_body)
  177. print(f"[Interceptor] 成功修改 Response Body -> {url}")
  178. return
  179. except Exception as e:
  180. print(f"[Interceptor] 修改 Response 时发生错误: {e}")
  181. # 兜底:如果报错,原样放行请求
  182. self.page.run_cdp('Fetch.continueRequest', requestId=request_id)