| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # toolkit/vs_cloud_api.py
- import requests
- import json
- import time
- import urllib.parse
- import configure
- from datetime import datetime
- from typing import Dict, Any, List, Optional
- from vs_types import NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from vs_log_macros import VSC_ERROR, VSC_INFO, VSC_WARN, VSC_DEBUG
- from tools.clash_api import switch_next_node
- class VSCloudApi:
- """
- @brief VSCloudApi 的 Python 实现
- 用于对接云端服务 (打码、邮件、Session存储、任务调度)
- """
- _instance = None
- def __new__(cls, *args, **kwargs):
- if cls._instance is None:
- cls._instance = super(VSCloudApi, cls).__new__(cls)
- # 初始化默认配置
- cls._instance.base_url = "https://visafly.top"
- cls._instance.api_token = "Bearer tok_e946329a60ff45ba807f3f41b0e8b7fc"
- cls._instance.session = requests.Session()
- return cls._instance
- @staticmethod
- def Instance():
- return VSCloudApi()
- def _get_headers(self, content_type: str = "application/json") -> Dict[str, str]:
- return {
- "Authorization": self.api_token,
- "Content-Type": content_type,
- "Accept": "application/json, text/plain, */*"
- }
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params)
- VSC_DEBUG('vs_cloud', f'[perform request] {method} {url} {data} {json_data} {params} {resp.text}')
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def get_vas_task_pop(self, routing_key: str) -> Optional[Dict]:
- if configure.TEST_TASK:
- return configure.TEST_TASK
- url = f"{self.base_url}/api/vas/task/pop"
- params = {
- "queue_name": routing_key,
- }
- headers = self._get_headers()
-
- resp = self._perform_request('GET', url, params=params, headers=headers)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Get vas task pop biz error: {result.get('message')}")
-
- def get_vas_task(self, task_id: str) -> dict:
- url = f"{self.base_url}/api/vas/task/detail"
- params = {"task_id": task_id}
- headers = self._get_headers()
- resp = self._perform_request('GET', url, params=params, headers=headers)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Get Task={task_id} error: {result.get('message')}")
- def update_vas_task(self,
- task_id: str,
- update_data: Dict[str, Any]) -> Optional[Dict]:
- """
- 更新任务
- API: POST /api/vas/task/update?id=1
- Body: update_data (json)
- """
- url = f"{self.base_url}/api/vas/task/update"
- params = {"id": task_id}
- headers = self._get_headers()
- resp = self._perform_request('POST', url, params=params, json_data=update_data, headers=headers)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Update vas task biz error: {result.get('message')}")
-
- def return_vas_task_to_queue(self, task_id: int):
- """
- 重入队列
- API: POST /api/vas/task/return_to_queue?task_id=1
- """
- url = f"{self.base_url}/api/vas/task/return_to_queue"
- params = {"task_id": task_id}
- headers = self._get_headers()
- resp = self._perform_request('POST', url, params=params, headers=headers)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Return vas task to queue biz error: {result.get('message')}")
-
- def push_weixin_text(self, text:str):
- """
- 推送微信文本消息
- API: POST https://visafly.top/api/wechat/send_no_token
- """
- url = f"{self.base_url}/api/wechat/send_no_token"
- payload = {"message": text}
- headers = self._get_headers()
- resp = self._perform_request('POST', url, json_data=payload, headers=headers)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Return vas task to queue biz error: {result.get('message')}")
- def create_task(self, command: str, args: Dict) -> str:
- """
- [核心] 创建任务
- :param command: 任务指令 (e.g., "AntiCloudflareTurnstileTask", "AntiCloudflareTask")
- :param args: 任务参数字典 (e.g., {"proxy": "...", "websiteUrl": "..."})
- :return: task_id (直接返回任务ID,方便调用方)
- """
- url = f"{self.base_url}/api/tasks"
- headers = self._get_headers()
-
- payload = {
- "command": command,
- "args": args,
- "status": 0
- }
- # 发送请求
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
-
- # 校验业务状态码
- if result.get("code") == 0:
- data = result.get("data", {})
- task_id = data.get("id")
- if not task_id:
- raise BizLogicError(message=f"Task created but no ID returned. Resp: {data}")
- return str(task_id)
- else:
- raise BizLogicError(message=f"Create task failed ({command}): {result.get('message')}")
- def get_task_result(self, task_id: str, timeout: int = 120, interval: int = 3) -> Dict:
- """
- [核心] 轮询获取任务结果
- :param task_id: 任务ID
- :param timeout: 最大等待时间(秒)
- :param interval: 轮询间隔(秒)
- :return: 任务成功后的 data 字典 (包含 token/cookies 等)
- """
- url = f"{self.base_url}/api/tasks/{task_id}"
- headers = self._get_headers()
-
- start_time = time.time()
-
- while True:
- # 1. 检查是否超时
- if time.time() - start_time > timeout:
- raise BizLogicError(message=f"Wait for task result timeout ({timeout}s). TaskID: {task_id}")
- try:
- # 2. 发起查询
- resp = self._perform_request('GET', url, headers=headers)
- result = resp.json()
-
- # 3. 校验 API 层面错误
- if result.get("code") != 0:
- raise BizLogicError(message=f"API Error fetching task: {result.get('message')}")
-
- data = result.get("data", {})
- status = data.get("status")
-
- # 4. 判断任务状态
- if status == 2: # 成功
- return data
-
- elif status == 3: # 失败
- error_msg = data.get("result", "Unknown error")
- raise BizLogicError(message=f"Task execution failed: {error_msg}")
-
- # status 为 0 (Pending) 或 1 (Running),继续等待
-
- except Exception as e:
- # 如果是 BizLogicError 直接抛出,不重试
- if isinstance(e, BizLogicError):
- raise e
- # 网络波动等其他异常,记录日志并重试
- VSC_WARN("vs_cloud", f"Polling exception: {str(e)}")
- # 等待下次轮询
- time.sleep(interval)
- def create_http_session(
- self,
- session_id: str,
- cookies: Optional[str] = None,
- local_storage: Optional[str] = None,
- user_agent: Optional[str] = None,
- proxy: Optional[str] = None,
- page: Optional[str] = None,
- session_storage: Optional[str] = None
- ) -> Optional[Dict]:
- """创建 http session"""
- url = f"{self.base_url}/api/http-session"
- headers = self._get_headers()
-
- payload = {
- "local_storage": local_storage,
- "session_storage": session_storage,
- "cookies": cookies,
- "user_agent": user_agent,
- "proxy": proxy,
- "page": page,
- "session_id": session_id
- }
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Create http session biz error: {result.get('message')}")
-
- def slot_refresh_start(
- self,
- routing_key: str,
- country: str = "",
- city: str = "",
- visa_type: str = "Tourist",
- snapshot_source: str = "worker",
- ):
- url = f"{self.base_url}/api/slot_refresh/start"
- payload = {
- "routing_key": routing_key,
- "country": country,
- "city": city,
- "visa_type": visa_type,
- "snapshot_source": snapshot_source,
- }
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Slot refresh start biz error: {result.get('message')}")
- def slot_refresh_success(
- self,
- routing_key: str,
- snapshot_source: str = "worker",
- ):
- url = f'{self.base_url}/api/slot_refresh/success'
- payload = {
- "routing_key": routing_key,
- "snapshot_source": snapshot_source,
- }
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Slot refresh success biz error: {result.get('message')}")
- def slot_refresh_fail(
- self,
- routing_key: str,
- error: str,
- snapshot_source: str = "worker",
- ):
- url = f'{self.base_url}/api/slot_refresh/fail'
- payload = {
- "routing_key": routing_key,
- "snapshot_source": snapshot_source,
- "error": error,
- }
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Slot refresh fail biz error: {result.get('message')}")
-
- def get_next_account(
- self,
- pool_name: str,
- account_cd: int = 60
- ):
- if configure.TEST_ACCOUNT:
- return configure.TEST_ACCOUNT
- url = f'{self.base_url}/api/account/next'
- payload = {
- "pool_name": pool_name,
- "account_cd": account_cd
- }
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Get next account biz error: {result.get('message')}")
-
- def get_next_proxy(
- self,
- pools: List[str],
- proxy_cd: int = 60
- ):
- if configure.TEST_PROXY:
- if configure.CLASH_SWITCH_NODE:
- node = switch_next_node()
- VSC_INFO('-', f'proxy node={node}')
- return configure.TEST_PROXY
- url = f'{self.base_url}/api/proxy/next-ip'
- payload = {
- "pools": pools,
- "proxy_cd": proxy_cd
- }
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, json_data=payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Get next account biz error: {result.get('message')}")
-
- def slot_snapshot_report(
- self,
- query_payload: Dict[str, Any] = {}
- ):
- url = f"{self.base_url}/api/slots/report"
-
- headers = self._get_headers()
- resp = self._perform_request("POST", url, headers=headers, json_data=query_payload)
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- raise BizLogicError(message=f"Slot refresh fail biz error: {result.get('message')}")
- def fetch_mail_content(
- self,
- email: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- sent_date: str,
- expiry: int
- ) -> Optional[str]:
- """
- 获取邮件内容
- """
- params = {
- "email": email,
- "sender": sender,
- "recipient": recipient,
- "subjectKeywords": subject_keywords,
- "bodyKeywords": body_keywords,
- "sentDate": sent_date,
- "expiry": str(expiry)
- }
-
- url = f"{self.base_url}/api/email-authorizations/fetch"
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, params=params, data="")
- result = resp.json()
- if result.get('code') == 0:
- data = result.get('data', {})
- return data.get('body', '')
- else:
- raise BizLogicError(message=f"Fetch mail content biz error: {result.get('message')}")
- def fetch_mail_content_from_top(
- self,
- email: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- top: int
- ) -> Optional[str]:
- """从顶部获取邮件内容"""
- params = {
- "email": email,
- "sender": sender,
- "recipient": recipient,
- "subjectKeywords": subject_keywords,
- "bodyKeywords": body_keywords,
- "top": str(top)
- }
-
- url = f"{self.base_url}/api/email-authorizations/fetch-top"
- headers = self._get_headers()
- resp = self._perform_request('POST', url, headers=headers, params=params, data="")
- result = resp.json()
- if result.get('code') == 0:
- data = result.get('data', {})
- return data.get('body', "")
- else:
- raise BizLogicError(message=f"Fetch mail content from top biz error: {result.get('message')}")
|