| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- # toolkit/vs_cloud_api.py
- import requests
- import json
- import time
- import urllib.parse
- from typing import Dict, Any, Optional
- from vs_log_macros import VSC_ERROR, VSC_INFO, VSC_DEBUG # type: ignore
- class VSCloudApi:
- """
- @brief VSCloudApi 的 Python 实现
- 用于对接云端服务 (打码、邮件、Session存储、任务调度)
- """
- _instance = None
- def __new__(cls, *args, **kwargs):
- if cls._instance is None:
- cls._instance = super(VSCloudApi, cls).__new__(cls)
- # 初始化默认配置
- cls._instance.base_url = "http://45.137.220.138:8888"
- cls._instance.api_token = "Bearer tok_8cb26cf337cb405784eb346dfafb7f54"
- cls._instance.session = requests.Session()
- return cls._instance
- @staticmethod
- def Instance():
- return VSCloudApi()
- def _get_headers(self, content_type: str = "application/json") -> Dict[str, str]:
- return {
- "Authorization": self.api_token,
- "Content-Type": content_type,
- "Accept": "application/json, text/plain, */*"
- }
- # =========================================================================
- # VAS Task Management (新增 API)
- # =========================================================================
- def get_vas_task_pop(self, routing_key: str) -> Optional[Dict]:
- """
- 获取任务列表
- API: GET /api/vas/task/list
- """
- url = f"{self.base_url}/api/vas/task/pop"
- params = {
- "queue_name": routing_key,
- }
- headers = self._get_headers()
-
- try:
- resp = self.session.get(url, params=params, headers=headers, timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"get_vas_task_list biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"get_vas_task_list failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"get_vas_task_list exception: {e}")
- return False
- def update_vas_task(self,
- task_id: int,
- update_data: Dict[str, Any]) -> Optional[Dict]:
- """
- 更新任务
- API: POST /api/vas/task/update?id=1
- Body: update_data (json)
- """
- url = f"{self.base_url}/api/vas/task/update"
- params = {"id": task_id}
- headers = self._get_headers()
-
- try:
- resp = self.session.post(url, params=params, json=update_data, headers=headers, timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"update_vas_task exception: {e}")
-
- return None
- def submit_anti_turnstile_task(self, proxy: str, website_url: str) -> Optional[Dict]:
- """
- 提交反 Turnstile 任务
- """
- url = f"{self.base_url}/api/tasks"
- headers = self._get_headers()
-
- args = {
- "proxy": proxy,
- "websiteUrl": website_url
- }
-
- payload = {
- "command": "AntiCloudflareTurnstileTask",
- "args": json.dumps(args),
- "status": 0
- }
- try:
- resp = self.session.post(url, headers=headers, json=payload, timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"submit_anti_turnstile_task failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"submit_anti_turnstile_task exception: {e}")
- return None
- def get_anti_turnstile_result(self, task_id: str) -> Optional[Dict]:
- """获取反 Turnstile 结果"""
- url = f"{self.base_url}/api/tasks/{task_id}"
- headers = self._get_headers()
- try:
- resp = self.session.get(url, headers=headers, timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"get_anti_turnstile_result exception: {e}")
- return None
-
- def submit_anticloudflare_task(self, proxy: str, website_url: str) -> Optional[Dict]:
- """
- 提交 AntiCloudflareTask (用于 TLSContact 5s 盾)
- """
- url = f"{self.base_url}/api/tasks"
-
- args = {
- 'proxy': proxy,
- 'websiteUrl': website_url
- }
- data = {
- "command": "AntiCloudflareTask",
- "args": json.dumps(args),
- "status": 0
- }
- headers = self._get_headers()
- try:
- response = self.session.post(url, headers=headers, json=data, timeout=30)
- if response.status_code == 200:
- result = response.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {response.status_code} {response.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"submit_anticloudflare_task exception: {e}")
- return None
-
- def get_anticloudflare_result(self, task_id, retry_interval=5, max_retries=20) -> Optional[Dict]:
- """
- 获取 AntiCloudflareTask 结果 (带轮询)
- """
- url = f"{self.base_url}/api/tasks/{task_id}"
- headers = self._get_headers()
- try:
- for attempt in range(max_retries):
- response = self.session.get(url, headers=headers, timeout=30)
-
- if response.status_code == 200:
- result = response.json()
- if result.get("code") == 0:
- data = result.get("data", {})
- # status 2 表示成功
- if data.get("status") == 2:
- return data
- elif data.get("status") == 3:
- VSC_ERROR("vs_cloud", f"AntiCloudflareTask failed: {data.get('result')}")
- return None
- else:
- time.sleep(retry_interval)
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"get_anti_turnstile_result failed: {response.status_code} {response.text[:100]}")
- VSC_ERROR("vs_cloud", "Max retries reached, AntiCloudflareTask not completed.")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"get_anticloudflare_result exception: {e}")
- return None
- def create_http_session(
- self,
- session_id: str,
- cookies: str,
- local_storage: str,
- user_agent: str,
- proxy: str,
- page: str
- ) -> Optional[Dict]:
- """创建 http session"""
- url = f"{self.base_url}/api/http-session"
- headers = self._get_headers()
-
- payload = {
- "local_storage": local_storage,
- "cookies": cookies,
- "user_agent": user_agent,
- "proxy": proxy,
- "page": page,
- "session_id": session_id
- }
-
- try:
- resp = self.session.post(url, headers=headers, json=payload, timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get("code") == 0:
- return result.get("data", {})
- else:
- VSC_ERROR("vs_cloud", f"update_vas_task biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"create_http_session failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"create_http_session exception: {e}")
- return None
- def fetch_mail_content(
- self,
- email: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- sent_date: str,
- expiry: int
- ) -> Optional[str]:
- """
- 获取邮件内容
- """
- params = {
- "email": email,
- "sender": sender,
- "recipient": recipient,
- "subjectKeywords": subject_keywords,
- "bodyKeywords": body_keywords,
- "sentDate": sent_date,
- "expiry": str(expiry)
- }
-
- url = f"{self.base_url}/api/email-authorizations/fetch"
- headers = self._get_headers()
- try:
- resp = self.session.post(url, headers=headers, params=params, data="", timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get('code', 0) == 0:
- data = result.get('data', {})
- return data.get('body', "")
- else:
- VSC_ERROR("vs_cloud", f"fetch_mail_content biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"fetch_mail_content failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"fetch_mail_content exception: {e}")
-
- return None
- def fetch_mail_content_from_top(
- self,
- email: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- top: int
- ) -> Optional[str]:
- """从顶部获取邮件内容"""
- params = {
- "email": email,
- "sender": sender,
- "recipient": recipient,
- "subjectKeywords": subject_keywords,
- "bodyKeywords": body_keywords,
- "top": str(top)
- }
-
- url = f"{self.base_url}/api/email-authorizations/fetch-top"
- headers = self._get_headers()
-
- try:
- resp = self.session.post(url, headers=headers, params=params, data="", timeout=30)
- if resp.status_code == 200:
- result = resp.json()
- if result.get('code', 0) == 0:
- data = result.get('data', {})
- return data.get('body', "")
- else:
- VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top biz error: {result.get('message')}")
- else:
- VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- VSC_ERROR("vs_cloud", f"fetch_mail_content_from_top exception: {e}")
- return None
|