| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- # toolkit/account_manager.py
- import threading
- import time
- import random
- from typing import Optional, Dict, Any
- from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO # type: ignore
- class AccountManager:
- """模拟账户管理器,用于获取和锁定账户。"""
- _instance = None
- _lock = threading.Lock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._accounts = {
- "ie_nl": [
- {
- "id": 580,
- "lock_until": 0,
- "password": "y1wm@hf0Bn",
- "username": "sg_fr_onoy9i@gmail-app.com"
- },
- {
- "id": 581,
- "lock_until": 0,
- "password": "p7@@0NnOR8",
- "username": "sg_fr_k6vuhb@gmail-app.com"
- },
- {
- "id": 582,
- "lock_until": 0,
- "password": "D@4OCh2j3I",
- "username": "sg_fr_cvq08b@gmail-app.com"
- },
- {
- "id": 583,
- "lock_until": 0,
- "password": "xT01gsHw@j",
- "username": "sg_fr_4cn9xj@gmail-app.com"
- },
- {
- "id": 584,
- "lock_until": 0,
- "password": "@aprZh7NbA",
- "username": "sg_fr_m32a2e@gmail-app.com"
- },
- {
- "id": 585,
- "lock_until": 0,
- "password": "SR2hnX@ho5",
- "username": "sg_fr_p0nz5l@gmail-app.com"
- }
- ],
- "gb_fr": [
- {
- "id": 0,
- "username":"arket_zz@163.com",
- "password": "Visafly@111",
- "lock_until": 0
- },
- ],
- "ie_es": [
- {
- "id": 0,
- "username":"arket_zz@163.com",
- "password": "dx4ua@!.X.i8Xn8",
- "lock_until": 0
- },
- ]
- }
- cls._instance._account_lock = threading.Lock() # 用于保护账户状态
- return cls._instance
- @staticmethod
- def Instance():
- return AccountManager()
- def get_next_account(self, pool_name: str) -> Optional[Dict[str, Any]]:
- """从指定池中获取下一个可用账户。"""
- with self._account_lock:
- accounts = self._accounts.get(pool_name, [])
- now = time.time()
-
- available_accounts = [acc for acc in accounts if acc["lock_until"] <= now]
- if not available_accounts:
- VSC_WARN("account_manager", "No available accounts in pool '%s'", pool_name)
- return None
-
- # 简单地随机选择一个可用账户
- account = random.choice(available_accounts)
- VSC_DEBUG("account_manager", "Selected account ID %d from pool '%s'", account["id"], pool_name)
- return account
- def lock_account(self, pool_name: str, account_id: int, duration_seconds: int):
- """锁定指定账户一段时间。"""
- with self._account_lock:
- accounts = self._accounts.get(pool_name, [])
- for acc in accounts:
- if acc["id"] == account_id:
- acc["lock_until"] = time.time() + duration_seconds
- VSC_INFO("account_manager", "Locked account ID %d in pool '%s' until %s",
- account_id, pool_name, time.ctime(acc["lock_until"]))
- return
- VSC_WARN("account_manager", "Account ID %d not found in pool '%s' for locking", account_id, pool_name)
|