proxy_utils.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import aiohttp
  4. import time
  5. import json
  6. from collections import Counter
  7. import statistics
  8. from typing import List, Dict, Any
  9. def build_proxy_url(item: Dict[str, Any]) -> str:
  10. """
  11. 将 JSON 中的代理条目(scheme/server/port/username/password)转换为
  12. aiohttp 可接受的 proxy URL,例如:
  13. http://user:pass@host:port
  14. http://host:port
  15. """
  16. scheme = item.get("scheme", "http")
  17. host = item.get("server") or item.get("host") or item.get("ip")
  18. port = item.get("port")
  19. user = item.get("username") or item.get("user")
  20. pwd = item.get("password") or item.get("pass")
  21. if not host or not port:
  22. return None
  23. if user:
  24. return f"{scheme}://{user}:{pwd}@{host}:{port}"
  25. else:
  26. return f"{scheme}://{host}:{port}"
  27. def load_proxies_from_json(path: str, pool_name) -> List[str]:
  28. proxies: List[str] = []
  29. try:
  30. with open(path, "r", encoding="utf-8") as f:
  31. pools = json.load(f)
  32. arr = pools[pool_name]
  33. if not isinstance(arr, list):
  34. print(f"{path} does not contain a JSON array.")
  35. return []
  36. for entry in arr:
  37. if entry.get('enable', True):
  38. try:
  39. p = build_proxy_url(entry)
  40. if p:
  41. proxies.append(p)
  42. except Exception:
  43. continue
  44. except FileNotFoundError:
  45. print(f"Proxy file {path} not found. Continuing without proxies.")
  46. except Exception as e:
  47. print(f"Failed to load proxies from {path}: {e}")
  48. return proxies
  49. def select_good_iplist(proxies):
  50. pass