#!/usr/bin/env python3 import asyncio import aiohttp import time import json from collections import Counter import statistics from typing import List, Dict, Any def build_proxy_url(item: Dict[str, Any]) -> str: """ 将 JSON 中的代理条目(scheme/server/port/username/password)转换为 aiohttp 可接受的 proxy URL,例如: http://user:pass@host:port http://host:port """ scheme = item.get("scheme", "http") host = item.get("server") or item.get("host") or item.get("ip") port = item.get("port") user = item.get("username") or item.get("user") pwd = item.get("password") or item.get("pass") if not host or not port: return None if user: return f"{scheme}://{user}:{pwd}@{host}:{port}" else: return f"{scheme}://{host}:{port}" def load_proxies_from_json(path: str, pool_name) -> List[str]: proxies: List[str] = [] try: with open(path, "r", encoding="utf-8") as f: pools = json.load(f) arr = pools[pool_name] if not isinstance(arr, list): print(f"{path} does not contain a JSON array.") return [] for entry in arr: if entry.get('enable', True): try: p = build_proxy_url(entry) if p: proxies.append(p) except Exception: continue except FileNotFoundError: print(f"Proxy file {path} not found. Continuing without proxies.") except Exception as e: print(f"Failed to load proxies from {path}: {e}") return proxies def select_good_iplist(proxies): pass