# toolkit/vs_cloud_api.py import requests import json import time import urllib.parse from typing import Dict, Any, Optional from vs_log_macros import VSC_ERROR, VSC_INFO, VSC_DEBUG # type: ignore class VSCloudApi: """ @brief VSCloudApi 的 Python 实现 (1:1 对应 C++ 版本) 用于对接 http://45.137.220.138:8888 的云端服务 """ _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(VSCloudApi, cls).__new__(cls) # 初始化默认配置,对应 C++ 构造函数参数 cls._instance.base_url = "http://45.137.220.138:8000" cls._instance.api_token = "7x9EjFpmv7GjZc6AfVeqxuUBANpqkpkHAtxJM7CAW5oZhs0nEyCJBy39N4XXs5hgfYWXw3jFrcgXqQ42HAx9Qvwtk9vC2GvKBbWz" cls._instance.session = requests.Session() return cls._instance @staticmethod def Instance(): return VSCloudApi() def _get_headers(self, content_type: str = "application/json") -> Dict[str, str]: return { "Authorization": self.api_token, "Content-Type": content_type, "Accept": "application/json, text/plain, */*" } def _handle_request(self, method: str, endpoint: str, **kwargs) -> Optional[Any]: url = f"{self.base_url}{endpoint}" try: resp = self.session.request(method, url, timeout=30, **kwargs) if resp.status_code == 200: try: return resp.json() except json.JSONDecodeError: # 部分接口可能返回纯文本或空,视情况而定 return resp.text else: VSC_ERROR("vs_cloud", f"Request failed: {method} {url} [{resp.status_code}] {resp.text}") except Exception as e: VSC_ERROR("vs_cloud", f"Request exception: {method} {url} - {str(e)}") return None def query_user_pending(self, tech_provider: str, pending_users_out: Dict[str, Any]) -> bool: """查询待处理用户""" params = {"tech_provider": tech_provider} headers = self._get_headers() # C++: headers = curl_slist_append(headers, auth.c_str()); result = self._handle_request("GET", "/api/autobooking/pending", params=params, headers=headers) if result is not None: pending_users_out.update(result) return True return False def query_user_info(self, uid: str, userinfo_out: Dict[str, Any]) -> bool: """查询 VFS 用户信息""" headers = self._get_headers() result = self._handle_request("GET", f"/api/autobooking/{uid}", headers=headers) if result is not None: userinfo_out.update(result) return True return False def update_user_info(self, uid: str, uinfo: Dict[str, Any], updated_userinfo_out: Dict[str, Any]) -> bool: """更新 VFS 用户信息""" headers = self._get_headers() # C++: if (!uinfo.is_null()) put_data = uinfo.dump(); result = self._handle_request("PUT", f"/api/autobooking/{uid}", headers=headers, json=uinfo) if result is not None: updated_userinfo_out.update(result) return True return False def submit_anti_turnstile_task(self, proxy: str, website_url: str, task_out: Dict[str, Any]) -> bool: """ 提交反 Turnstile 任务 注意 C++ 逻辑:args 字段是 JSON dump 后的字符串 """ headers = self._get_headers() args = { "proxy": proxy, "websiteUrl": website_url } payload = { "command": "AntiCloudflareTurnstileTask", "args": json.dumps(args), # C++: data["args"] = args.dump(); "status": 0 } result = self._handle_request("POST", "/api/tasks", headers=headers, json=payload) if result is not None: task_out.update(result) return True return False def get_anti_turnstile_result(self, task_id: str, result_out: Dict[str, Any]) -> bool: """获取反 Turnstile 结果""" headers = self._get_headers() result = self._handle_request("GET", f"/api/tasks/{task_id}", headers=headers) if result is not None: result_out.update(result) return True return False def submit_anticloudflare_task(self, proxy: str, website_url: str) -> Optional[Dict]: """ 提交 AntiCloudflareTask (用于 TLSContact 5s 盾) """ # 注意:这里路径根据之前的 C++ 移植经验是 /api/tasks, # 如果你确定是 /common/api/tasks,请修改这里。为了兼容现有 base_url,我使用 /api/tasks # url = f"https://{self._base_url}/common/api/tasks" url = f"{self.base_url}/api/tasks" args = { 'proxy': proxy, 'websiteUrl': website_url } data = { "command": "AntiCloudflareTask", # 关键:Command 名字不同 "args": json.dumps(args), "status": 0 } headers = { 'Authorization': self.api_token, 'Content-Type': 'application/json' } try: response = self.session.post(url, headers=headers, json=data, timeout=30) if response.status_code != 200: VSC_ERROR("vs_cloud", f"submit_anticloudflare_task failed: {response.status_code}") return None return response.json() except Exception as e: VSC_ERROR("vs_cloud", f"submit_anticloudflare_task exception: {e}") return None def get_anticloudflare_result(self, task_id, retry_interval=5, max_retries=20) -> Optional[Dict]: """ 获取 AntiCloudflareTask 结果 (带轮询) """ url = f"{self.base_url}/api/tasks/{task_id}" headers = { 'Authorization': self.api_token } try: for attempt in range(max_retries): response = self.session.get(url, headers=headers, timeout=30) if response.status_code == 200: data = response.json() # status 2 表示成功 if data.get("status") == 2: return data elif data.get("status") == 3: VSC_ERROR("vs_cloud", f"AntiCloudflareTask failed: {data.get('result')}") return None else: # VSC_DEBUG("vs_cloud", f"Task {task_id} not ready, retrying...") time.sleep(retry_interval) else: VSC_ERROR("vs_cloud", f"Error getting task result: {response.text}") break VSC_ERROR("vs_cloud", "Max retries reached, AntiCloudflareTask not completed.") except Exception as e: VSC_ERROR("vs_cloud", f"get_anticloudflare_result exception: {e}") return None def create_http_session( self, session_id: str, cookies: str, local_storage: str, user_agent: str, proxy: str, page: str, created_session_out: Dict[str, Any] ) -> bool: """创建 http session""" headers = self._get_headers() payload = { "local_storage": local_storage, "cookies": cookies, "user_agent": user_agent, "proxy": proxy, "page": page, "session_id": session_id } result = self._handle_request("POST", "/api/http-session", headers=headers, json=payload) if result is not None: # result 可能是 None 如果解析失败,或者是一个 dict if isinstance(result, dict): created_session_out.update(result) return True return False def fetch_mail_content( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, sent_date: str, expiry: int, content_out: list # Python string is immutable, use list to simulate pointer behavior like C++ ) -> bool: """ 获取邮件内容 C++ logic: query params in URL + POST method + empty body """ params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "sentDate": sent_date, "expiry": str(expiry) } headers = self._get_headers() # C++: curl_easy_setopt(curl, CURLOPT_POST, 1L); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, ""); # requests.post(url, params=params) 会把 params 拼接到 URL 后面 url = f"{self.base_url}/api/email-authorizations/fetch" try: resp = self.session.post(url, headers=headers, params=params, data="", timeout=30) if resp.status_code == 200: # C++: content.assign(response_string.data(), response_string.length()); # 这里假设 content_out 是一个 list,我们将结果 append 进去或修改第一个元素 if isinstance(content_out, list): content_out.clear() content_out.append(resp.text) return True else: VSC_ERROR("vs_cloud", f"fetch_mail_content failed: {resp.status_code} {resp.text}") except Exception as e: VSC_ERROR("vs_cloud", f"fetch_mail_content exception: {e}") return False def fetch_mail_content_from_top( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, top: int, content_out: list ) -> bool: """从顶部获取邮件内容""" params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "top": str(top) } headers = self._get_headers() url = f"{self.base_url}/api/email-authorizations/fetch-top" try: resp = self.session.post(url, headers=headers, params=params, data="", timeout=30) if resp.status_code == 200: if isinstance(content_out, list): content_out.clear() content_out.append(resp.text) return True else: VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top failed: {resp.status_code} {resp.text}") except Exception as e: VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top exception: {e}") return False