app_manager.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # core/app_manager.py
  2. import json
  3. import os
  4. import threading
  5. from typing import Dict, List, Optional
  6. # 引入核心组件
  7. from vs_types import GroupConfig, QueryWaitConfig, PluginConfig, QueryWaitMode
  8. from group_coordinator import GroupCoordinator
  9. from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
  10. from core.plugin_reloader import reload_plugin_module
  11. class AppManager:
  12. _instance = None
  13. _lock = threading.Lock()
  14. def __new__(cls):
  15. with cls._lock:
  16. if cls._instance is None:
  17. cls._instance = super().__new__(cls)
  18. cls._instance._init()
  19. return cls._instance
  20. @staticmethod
  21. def Instance():
  22. return AppManager()
  23. def _init(self):
  24. self.coordinators: Dict[str, GroupCoordinator] = {}
  25. self.configs: Dict[str, GroupConfig] = {}
  26. self.config_file = "config/groups.json"
  27. def load_configs(self):
  28. """读取并解析配置文件"""
  29. if not os.path.exists(self.config_file):
  30. VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}")
  31. return
  32. try:
  33. with open(self.config_file, 'r', encoding='utf-8') as f:
  34. data = json.load(f)
  35. for item in data:
  36. # JSON -> GroupConfig 转换
  37. qw_data = item.get("query_wait", {})
  38. qw_cfg = QueryWaitConfig(
  39. mode=QueryWaitMode(qw_data.get("mode", 0)),
  40. fixed_wait=qw_data.get("fixed_wait", 0),
  41. random_min=qw_data.get("random_min", 0),
  42. random_max=qw_data.get("random_max", 0)
  43. )
  44. plg_data = item.get("plugin_config", {})
  45. plg_cfg = PluginConfig(
  46. lib_path=plg_data.get("lib_path", "plugins"),
  47. plugin_name=plg_data.get("plugin_name", ""),
  48. plugin_bin=plg_data.get("plugin_bin", ""),
  49. plugin_proto=plg_data.get("plugin_proto", "IVSPlg")
  50. )
  51. grp_cfg = GroupConfig(
  52. identifier=item["identifier"],
  53. enable=item.get("enable", False),
  54. need_account=item.get("need_account", False),
  55. account_pool=item.get("account_pool", ""),
  56. need_proxy=item.get("need_proxy", False),
  57. proxy_pool=item.get("proxy_pool", ""),
  58. target_instances=item.get("target_instances", 1),
  59. account_login_interval=item.get("account_login_interval", 0),
  60. query_wait=qw_cfg,
  61. plugin_config=plg_cfg,
  62. free_config=json.dumps(item.get("free_config", "{}"))
  63. )
  64. self.configs[grp_cfg.identifier] = grp_cfg
  65. VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
  66. except Exception as e:
  67. VSC_ERROR("app_mgr", f"Failed to load configs: {e}")
  68. def start_all(self):
  69. """启动所有 enable=True 的组"""
  70. for gid, cfg in self.configs.items():
  71. if cfg.enable and gid not in self.coordinators:
  72. self.start_group(gid)
  73. def start_group(self, group_id: str) -> bool:
  74. if group_id not in self.configs:
  75. VSC_ERROR("app_mgr", f"Group {group_id} not found in config")
  76. return False
  77. if group_id in self.coordinators:
  78. VSC_WARN("app_mgr", f"Group {group_id} is already running")
  79. return True
  80. cfg = self.configs[group_id]
  81. try:
  82. coord = GroupCoordinator(cfg)
  83. # 设置推送回调,这里可以连接到 WebSocket
  84. coord.set_push_callback(lambda t, d, s: print(f"[{group_id} PUSH] {d.decode()}"))
  85. coord.start()
  86. self.coordinators[group_id] = coord
  87. VSC_INFO("app_mgr", f"Started group: {group_id}")
  88. return True
  89. except Exception as e:
  90. VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}")
  91. return False
  92. def stop_group(self, group_id: str) -> bool:
  93. if group_id not in self.coordinators:
  94. return False
  95. VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
  96. try:
  97. self.coordinators[group_id].stop()
  98. del self.coordinators[group_id]
  99. VSC_INFO("app_mgr", f"Stopped group: {group_id}")
  100. return True
  101. except Exception as e:
  102. VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}")
  103. return False
  104. def restart_group(self, group_id: str) -> bool:
  105. self.stop_group(group_id)
  106. return self.start_group(group_id)
  107. def ota_update_plugin(self, plugin_name: str) -> List[str]:
  108. """
  109. OTA 热更新流程:
  110. 1. 找到所有使用该插件的运行中组
  111. 2. 停止这些组
  112. 3. 清除 Python 模块缓存 (热重载)
  113. 4. 重新启动这些组
  114. """
  115. affected_groups = []
  116. # 1. 查找受影响的组
  117. for gid, coord in self.coordinators.items():
  118. # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
  119. # 假设 config 存储在 m_cfg
  120. if coord.m_cfg.plugin_config.plugin_name == plugin_name:
  121. affected_groups.append(gid)
  122. VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
  123. # 2. 停止
  124. for gid in affected_groups:
  125. self.stop_group(gid)
  126. # 3. 卸载模块
  127. reload_plugin_module(plugin_name)
  128. # 4. 重启
  129. restarted = []
  130. for gid in affected_groups:
  131. if self.start_group(gid):
  132. restarted.append(gid)
  133. return restarted
  134. def get_status(self):
  135. """获取所有组的状态"""
  136. status_list = []
  137. for gid, cfg in self.configs.items():
  138. running = gid in self.coordinators
  139. status_list.append({
  140. "id": gid,
  141. "plugin": cfg.plugin_config.plugin_name,
  142. "running": running,
  143. "instances": cfg.target_instances if running else 0,
  144. "account_pool": cfg.account_pool
  145. })
  146. return status_list