account_manager.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # toolkit/account_manager.py
  2. import threading
  3. import time
  4. import random
  5. from typing import Optional, Dict, Any
  6. from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO # type: ignore
  7. class AccountManager:
  8. """模拟账户管理器,用于获取和锁定账户。"""
  9. _instance = None
  10. _lock = threading.Lock()
  11. def __new__(cls):
  12. with cls._lock:
  13. if cls._instance is None:
  14. cls._instance = super().__new__(cls)
  15. cls._instance._accounts = {
  16. "ie_nl": [
  17. {
  18. "id": 580,
  19. "lock_until": 0,
  20. "password": "y1wm@hf0Bn",
  21. "username": "sg_fr_onoy9i@gmail-app.com"
  22. },
  23. {
  24. "id": 581,
  25. "lock_until": 0,
  26. "password": "p7@@0NnOR8",
  27. "username": "sg_fr_k6vuhb@gmail-app.com"
  28. },
  29. {
  30. "id": 582,
  31. "lock_until": 0,
  32. "password": "D@4OCh2j3I",
  33. "username": "sg_fr_cvq08b@gmail-app.com"
  34. },
  35. {
  36. "id": 583,
  37. "lock_until": 0,
  38. "password": "xT01gsHw@j",
  39. "username": "sg_fr_4cn9xj@gmail-app.com"
  40. },
  41. {
  42. "id": 584,
  43. "lock_until": 0,
  44. "password": "@aprZh7NbA",
  45. "username": "sg_fr_m32a2e@gmail-app.com"
  46. },
  47. {
  48. "id": 585,
  49. "lock_until": 0,
  50. "password": "SR2hnX@ho5",
  51. "username": "sg_fr_p0nz5l@gmail-app.com"
  52. }
  53. ],
  54. "gb_fr": [
  55. {
  56. "id": 0,
  57. "username":"arket_zz@163.com",
  58. "password": "Visafly@111",
  59. "lock_until": 0
  60. },
  61. ],
  62. "ie_es": [
  63. {
  64. "id": 0,
  65. "username":"arket_zz@163.com",
  66. "password": "dx4ua@!.X.i8Xn8",
  67. "lock_until": 0
  68. },
  69. ]
  70. }
  71. cls._instance._account_lock = threading.Lock() # 用于保护账户状态
  72. return cls._instance
  73. @staticmethod
  74. def Instance():
  75. return AccountManager()
  76. def get_next_account(self, pool_name: str) -> Optional[Dict[str, Any]]:
  77. """从指定池中获取下一个可用账户。"""
  78. with self._account_lock:
  79. accounts = self._accounts.get(pool_name, [])
  80. now = time.time()
  81. available_accounts = [acc for acc in accounts if acc["lock_until"] <= now]
  82. if not available_accounts:
  83. VSC_WARN("account_manager", "No available accounts in pool '%s'", pool_name)
  84. return None
  85. # 简单地随机选择一个可用账户
  86. account = random.choice(available_accounts)
  87. VSC_DEBUG("account_manager", "Selected account ID %d from pool '%s'", account["id"], pool_name)
  88. return account
  89. def lock_account(self, pool_name: str, account_id: int, duration_seconds: int):
  90. """锁定指定账户一段时间。"""
  91. with self._account_lock:
  92. accounts = self._accounts.get(pool_name, [])
  93. for acc in accounts:
  94. if acc["id"] == account_id:
  95. acc["lock_until"] = time.time() + duration_seconds
  96. VSC_INFO("account_manager", "Locked account ID %d in pool '%s' until %s",
  97. account_id, pool_name, time.ctime(acc["lock_until"]))
  98. return
  99. VSC_WARN("account_manager", "Account ID %d not found in pool '%s' for locking", account_id, pool_name)