# toolkit/vs_cloud_api.py import requests import json import time import urllib.parse from typing import Dict, Any, Optional from vs_log_macros import VSC_ERROR, VSC_INFO, VSC_DEBUG class VSCloudApi: """ @brief VSCloudApi 的 Python 实现 用于对接云端服务 (打码、邮件、Session存储、任务调度) """ _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(VSCloudApi, cls).__new__(cls) # 初始化默认配置 cls._instance.base_url = "http://45.137.220.138:8888" cls._instance.api_token = "Bearer tok_8cb26cf337cb405784eb346dfafb7f54" cls._instance.session = requests.Session() return cls._instance @staticmethod def Instance(): return VSCloudApi() def _get_headers(self, content_type: str = "application/json") -> Dict[str, str]: return { "Authorization": self.api_token, "Content-Type": content_type, "Accept": "application/json, text/plain, */*" } # ========================================================================= # VAS Task Management (新增 API) # ========================================================================= def get_vas_task_pop(self, routing_key: str) -> Optional[Dict]: """ 获取任务列表 API: GET /api/vas/task/list """ url = f"{self.base_url}/api/vas/task/pop" params = { "queue_name": routing_key, } headers = self._get_headers() try: resp = self.session.get(url, params=params, headers=headers, timeout=30) if resp.status_code == 200: result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"get_vas_task_pop biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"get_vas_task_pop failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"get_vas_task_pop exception: {e}") return False def update_vas_task(self, task_id: int, update_data: Dict[str, Any]) -> Optional[Dict]: """ 更新任务 API: POST /api/vas/task/update?id=1 Body: update_data (json) """ url = f"{self.base_url}/api/vas/task/update" params = {"id": task_id} headers = self._get_headers() try: resp = self.session.post(url, params=params, json=update_data, headers=headers, timeout=30) if resp.status_code == 200: result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"update_vas_task failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"update_vas_task exception: {e}") return None def submit_anti_turnstile_task(self, proxy: str, website_url: str) -> Optional[Dict]: """ 提交反 Turnstile 任务 """ url = f"{self.base_url}/api/tasks" headers = self._get_headers() args = { "proxy": proxy, "websiteUrl": website_url } payload = { "command": "AntiCloudflareTurnstileTask", "args": json.dumps(args), "status": 0 } try: resp = self.session.post(url, headers=headers, json=payload, timeout=30) if resp.status_code == 200: result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"submit_anti_turnstile_task failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"submit_anti_turnstile_task exception: {e}") return None def get_anti_turnstile_result(self, task_id: str) -> Optional[Dict]: """获取反 Turnstile 结果""" url = f"{self.base_url}/api/tasks/{task_id}" headers = self._get_headers() try: resp = self.session.get(url, headers=headers, timeout=30) if resp.status_code == 200: result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"get_anti_turnstile_result exception: {e}") return None def submit_anticloudflare_task(self, proxy: str, website_url: str) -> Optional[Dict]: """ 提交 AntiCloudflareTask (用于 TLSContact 5s 盾) """ url = f"{self.base_url}/api/tasks" args = { 'proxy': proxy, 'websiteUrl': website_url } data = { "command": "AntiCloudflareTask", "args": json.dumps(args), "status": 0 } headers = self._get_headers() try: response = self.session.post(url, headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {response.status_code} {response.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"submit_anticloudflare_task exception: {e}") return None def get_anticloudflare_result(self, task_id, retry_interval=5, max_retries=20) -> Optional[Dict]: """ 获取 AntiCloudflareTask 结果 (带轮询) """ url = f"{self.base_url}/api/tasks/{task_id}" headers = self._get_headers() try: for attempt in range(max_retries): response = self.session.get(url, headers=headers, timeout=30) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) # status 2 表示成功 if data.get("status") == 2: return data elif data.get("status") == 3: VSC_ERROR("vs_cloud", f"AntiCloudflareTask failed: {data.get('result')}") return None else: time.sleep(retry_interval) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {response.status_code} {response.text[:100]}") VSC_ERROR("vs_cloud", "Max retries reached, AntiCloudflareTask not completed.") except Exception as e: VSC_ERROR("vs_cloud", f"get_anticloudflare_result exception: {e}") return None def create_http_session( self, session_id: str, cookies: str, local_storage: str, user_agent: str, proxy: str, page: str ) -> Optional[Dict]: """创建 http session""" url = f"{self.base_url}/api/http-session" headers = self._get_headers() payload = { "local_storage": local_storage, "cookies": cookies, "user_agent": user_agent, "proxy": proxy, "page": page, "session_id": session_id } try: resp = self.session.post(url, headers=headers, json=payload, timeout=30) if resp.status_code == 200: result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"create_http_session failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"create_http_session exception: {e}") return None def fetch_mail_content( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, sent_date: str, expiry: int ) -> Optional[str]: """ 获取邮件内容 """ params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "sentDate": sent_date, "expiry": str(expiry) } url = f"{self.base_url}/api/email-authorizations/fetch" headers = self._get_headers() try: resp = self.session.post(url, headers=headers, params=params, data="", timeout=30) if resp.status_code == 200: result = resp.json() if result.get('code', 0) == 0: data = result.get('data', {}) return data.get('body', "") else: VSC_ERROR("vs_cloud", f"fetch_mail_content biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"fetch_mail_content failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"fetch_mail_content exception: {e}") return None def fetch_mail_content_from_top( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, top: int ) -> Optional[str]: """从顶部获取邮件内容""" params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "top": str(top) } url = f"{self.base_url}/api/email-authorizations/fetch-top" headers = self._get_headers() try: resp = self.session.post(url, headers=headers, params=params, data="", timeout=30) if resp.status_code == 200: result = resp.json() if result.get('code', 0) == 0: data = result.get('data', {}) return data.get('body', "") else: VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top biz error: {result.get('message')}") else: VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top failed: {resp.status_code} {resp.text[:100]}") except Exception as e: VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top exception: {e}") return None