# toolkit/vs_cloud_api.py import requests import json import time import urllib.parse from datetime import datetime from typing import Dict, Any, Optional from vs_types import NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from vs_log_macros import VSC_ERROR, VSC_INFO, VSC_WARN, VSC_DEBUG class VSCloudApi: """ @brief VSCloudApi 的 Python 实现 用于对接云端服务 (打码、邮件、Session存储、任务调度) """ _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(VSCloudApi, cls).__new__(cls) # 初始化默认配置 cls._instance.base_url = "https://visafly.top" cls._instance.api_token = "Bearer tok_e946329a60ff45ba807f3f41b0e8b7fc" cls._instance.session = requests.Session() return cls._instance @staticmethod def Instance(): return VSCloudApi() def _get_headers(self, content_type: str = "application/json") -> Dict[str, str]: return { "Authorization": self.api_token, "Content-Type": content_type, "Accept": "application/json, text/plain, */*" } def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None): """ 统一 HTTP 请求封装,严格复刻 C++ 逻辑: 1. 发送 OPTIONS 请求 2. 发送实际请求 """ resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params) VSC_DEBUG('vs_cloud', f'[perform request] {method} {url} {data} {json_data} {params} {resp.text}') if resp.status_code == 200: return resp elif resp.status_code == 401: raise SessionExpiredOrInvalidError() elif resp.status_code == 403: raise PermissionDeniedError() elif resp.status_code == 429: raise RateLimiteddError() else: raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") # ========================================================================= # VAS Task Management (新增 API) # ========================================================================= def get_vas_task_pop(self, routing_key: str) -> Optional[Dict]: """ 获取任务信息 API: GET /api/vas/task/pop """ url = f"{self.base_url}/api/vas/task/pop" params = { "queue_name": routing_key, } headers = self._get_headers() resp = self._perform_request('GET', url, params=params, headers=headers) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Get vas task pop biz error: {result.get('message')}") def get_vas_task(self, task_id: str) -> dict: # 例如:请求 GET /api/v1/tasks/{task_id} # curl -X 'GET' \ # 'http://45.137.220.138:8888/api/vas/task/get_by_order?order_id=ORD-20260306212306-7e604df8' \ # -H 'accept: application/json' \ # -H 'Authorization: Bearer tok_c9be86aa78274939a3c008db31ce9d22' url = f"{self.base_url}/api/vas/task/detail" params = {"task_id": task_id} headers = self._get_headers() resp = self._perform_request('GET', url, params=params, headers=headers) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Get Task={task_id} error: {result.get('message')}") def update_vas_task(self, task_id: int, update_data: Dict[str, Any]) -> Optional[Dict]: """ 更新任务 API: POST /api/vas/task/update?id=1 Body: update_data (json) """ url = f"{self.base_url}/api/vas/task/update" params = {"id": task_id} headers = self._get_headers() resp = self._perform_request('POST', url, params=params, json_data=update_data, headers=headers) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Update vas task biz error: {result.get('message')}") def return_vas_task_to_queue(self, task_id: int): """ 重入队列 API: POST /api/vas/task/return_to_queue?task_id=1 """ url = f"{self.base_url}/api/vas/task/return_to_queue" params = {"task_id": task_id} headers = self._get_headers() resp = self._perform_request('POST', url, params=params, headers=headers) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Return vas task to queue biz error: {result.get('message')}") def push_weixin_text(self, text:str): """ 推送微信文本消息 API: POST https://visafly.top/api/wechat/send_no_token """ url = f"{self.base_url}/api/wechat/send_no_token" payload = {"message": text} headers = self._get_headers() resp = self._perform_request('POST', url, json_data=payload, headers=headers) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Return vas task to queue biz error: {result.get('message')}") def create_task(self, command: str, args: Dict) -> str: """ [核心] 创建任务 :param command: 任务指令 (e.g., "AntiCloudflareTurnstileTask", "AntiCloudflareTask") :param args: 任务参数字典 (e.g., {"proxy": "...", "websiteUrl": "..."}) :return: task_id (直接返回任务ID,方便调用方) """ url = f"{self.base_url}/api/tasks" headers = self._get_headers() payload = { "command": command, "args": args, "status": 0 } # 发送请求 resp = self._perform_request('POST', url, headers=headers, json_data=payload) result = resp.json() # 校验业务状态码 if result.get("code") == 0: data = result.get("data", {}) task_id = data.get("id") if not task_id: raise BizLogicError(message=f"Task created but no ID returned. Resp: {data}") return str(task_id) else: raise BizLogicError(message=f"Create task failed ({command}): {result.get('message')}") def get_task_result(self, task_id: str, timeout: int = 120, interval: int = 3) -> Dict: """ [核心] 轮询获取任务结果 :param task_id: 任务ID :param timeout: 最大等待时间(秒) :param interval: 轮询间隔(秒) :return: 任务成功后的 data 字典 (包含 token/cookies 等) """ url = f"{self.base_url}/api/tasks/{task_id}" headers = self._get_headers() start_time = time.time() while True: # 1. 检查是否超时 if time.time() - start_time > timeout: raise BizLogicError(message=f"Wait for task result timeout ({timeout}s). TaskID: {task_id}") try: # 2. 发起查询 resp = self._perform_request('GET', url, headers=headers) result = resp.json() # 3. 校验 API 层面错误 if result.get("code") != 0: raise BizLogicError(message=f"API Error fetching task: {result.get('message')}") data = result.get("data", {}) status = data.get("status") # 4. 判断任务状态 if status == 2: # 成功 return data elif status == 3: # 失败 error_msg = data.get("result", "Unknown error") raise BizLogicError(message=f"Task execution failed: {error_msg}") # status 为 0 (Pending) 或 1 (Running),继续等待 except Exception as e: # 如果是 BizLogicError 直接抛出,不重试 if isinstance(e, BizLogicError): raise e # 网络波动等其他异常,记录日志并重试 VSC_WARN("vs_cloud", f"Polling exception: {str(e)}") # 等待下次轮询 time.sleep(interval) def create_http_session( self, session_id: str, cookies: Optional[str] = None, local_storage: Optional[str] = None, user_agent: Optional[str] = None, proxy: Optional[str] = None, page: Optional[str] = None, session_storage: Optional[str] = None ) -> Optional[Dict]: """创建 http session""" url = f"{self.base_url}/api/http-session" headers = self._get_headers() payload = { "local_storage": local_storage, "session_storage": session_storage, "cookies": cookies, "user_agent": user_agent, "proxy": proxy, "page": page, "session_id": session_id } resp = self._perform_request('POST', url, headers=headers, json_data=payload) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Create http session biz error: {result.get('message')}") def slot_refresh_start( self, routing_key: str, country: str = "", city: str = "", visa_type: str = "Tourist", snapshot_source: str = "worker", ): url = f"{self.base_url}/api/slot_refresh/start" payload = { "routing_key": routing_key, "country": country, "city": city, "visa_type": visa_type, "snapshot_source": snapshot_source, } headers = self._get_headers() resp = self._perform_request('POST', url, headers=headers, json_data=payload) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Slot refresh start biz error: {result.get('message')}") def slot_refresh_success( self, routing_key: str, snapshot_source: str = "worker", ): url = f'{self.base_url}/api/slot_refresh/success' payload = { "routing_key": routing_key, "snapshot_source": snapshot_source, } headers = self._get_headers() resp = self._perform_request('POST', url, headers=headers, json_data=payload) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Slot refresh success biz error: {result.get('message')}") def slot_refresh_fail( self, routing_key: str, error: str, snapshot_source: str = "worker", ): url = f'{self.base_url}/api/slot_refresh/fail' payload = { "routing_key": routing_key, "snapshot_source": snapshot_source, "error": error, } headers = self._get_headers() resp = self._perform_request('POST', url, headers=headers, json_data=payload) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Slot refresh fail biz error: {result.get('message')}") def get_next_account( self, pool_name: str, lock_duration: float = 60 ): url = f'{self.base_url}/api/account/next' params = { "pool_name": pool_name, "lock_duration": lock_duration } headers = self._get_headers() resp = self._perform_request('GET', url, headers=headers, params=params) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Get next account biz error: {result.get('message')}") def slot_snapshot_report( self, query_payload: Dict[str, Any] = {} ): url = f"{self.base_url}/api/slots/report" headers = self._get_headers() resp = self._perform_request("POST", url, headers=headers, json_data=query_payload) result = resp.json() if result.get("code") == 0: return result.get("data", {}) else: raise BizLogicError(message=f"Slot refresh fail biz error: {result.get('message')}") def fetch_mail_content( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, sent_date: str, expiry: int ) -> Optional[str]: """ 获取邮件内容 """ params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "sentDate": sent_date, "expiry": str(expiry) } url = f"{self.base_url}/api/email-authorizations/fetch" headers = self._get_headers() resp = self._perform_request('POST', url, headers=headers, params=params, data="") result = resp.json() if result.get('code') == 0: data = result.get('data', {}) return data.get('body', '') else: raise BizLogicError(message=f"Fetch mail content biz error: {result.get('message')}") def fetch_mail_content_from_top( self, email: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str, top: int ) -> Optional[str]: """从顶部获取邮件内容""" params = { "email": email, "sender": sender, "recipient": recipient, "subjectKeywords": subject_keywords, "bodyKeywords": body_keywords, "top": str(top) } url = f"{self.base_url}/api/email-authorizations/fetch-top" headers = self._get_headers() resp = self._perform_request('POST', url, headers=headers, params=params, data="") result = resp.json() if result.get('code') == 0: data = result.get('data', {}) return data.get('body', "") else: raise BizLogicError(message=f"Fetch mail content from top biz error: {result.get('message')}")