# core/app_manager.py import json import os import threading from typing import Dict, List, Optional # 引入核心组件 from vs_types import GroupConfig, QueryWaitConfig, PluginConfig, QueryWaitMode from group_coordinator import GroupCoordinator from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN from core.plugin_reloader import reload_plugin_module class AppManager: _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init() return cls._instance @staticmethod def Instance(): return AppManager() def _init(self): self.coordinators: Dict[str, GroupCoordinator] = {} self.configs: Dict[str, GroupConfig] = {} self.config_file = "config/groups.json" def load_configs(self): """读取并解析配置文件""" if not os.path.exists(self.config_file): VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}") return try: with open(self.config_file, 'r', encoding='utf-8') as f: data = json.load(f) for item in data: # JSON -> GroupConfig 转换 qw_data = item.get("query_wait", {}) qw_cfg = QueryWaitConfig( mode=QueryWaitMode(qw_data.get("mode", 0)), fixed_wait=qw_data.get("fixed_wait", 0), random_min=qw_data.get("random_min", 0), random_max=qw_data.get("random_max", 0) ) plg_data = item.get("plugin_config", {}) plg_cfg = PluginConfig( lib_path=plg_data.get("lib_path", "plugins"), plugin_name=plg_data.get("plugin_name", ""), plugin_bin=plg_data.get("plugin_bin", ""), plugin_proto=plg_data.get("plugin_proto", "IVSPlg") ) grp_cfg = GroupConfig( identifier=item["identifier"], enable=item.get("enable", False), need_account=item.get("need_account", False), account_pool=item.get("account_pool", ""), need_proxy=item.get("need_proxy", False), proxy_pool=item.get("proxy_pool", ""), target_instances=item.get("target_instances", 1), account_login_interval=item.get("account_login_interval", 0), query_wait=qw_cfg, plugin_config=plg_cfg, free_config=json.dumps(item.get("free_config", "{}")) ) self.configs[grp_cfg.identifier] = grp_cfg VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.") except Exception as e: VSC_ERROR("app_mgr", f"Failed to load configs: {e}") def start_all(self): """启动所有 enable=True 的组""" for gid, cfg in self.configs.items(): if cfg.enable and gid not in self.coordinators: self.start_group(gid) def start_group(self, group_id: str) -> bool: if group_id not in self.configs: VSC_ERROR("app_mgr", f"Group {group_id} not found in config") return False if group_id in self.coordinators: VSC_WARN("app_mgr", f"Group {group_id} is already running") return True cfg = self.configs[group_id] try: coord = GroupCoordinator(cfg) # 设置推送回调,这里可以连接到 WebSocket coord.set_push_callback(lambda t, d, s: print(f"[{group_id} PUSH] {d.decode()}")) coord.start() self.coordinators[group_id] = coord VSC_INFO("app_mgr", f"Started group: {group_id}") return True except Exception as e: VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}") return False def stop_group(self, group_id: str) -> bool: if group_id not in self.coordinators: return False VSC_INFO("app_mgr", f"Stopping group: {group_id}...") try: self.coordinators[group_id].stop() del self.coordinators[group_id] VSC_INFO("app_mgr", f"Stopped group: {group_id}") return True except Exception as e: VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}") return False def restart_group(self, group_id: str) -> bool: self.stop_group(group_id) return self.start_group(group_id) def ota_update_plugin(self, plugin_name: str) -> List[str]: """ OTA 热更新流程: 1. 找到所有使用该插件的运行中组 2. 停止这些组 3. 清除 Python 模块缓存 (热重载) 4. 重新启动这些组 """ affected_groups = [] # 1. 查找受影响的组 for gid, coord in self.coordinators.items(): # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter # 假设 config 存储在 m_cfg if coord.m_cfg.plugin_config.plugin_name == plugin_name: affected_groups.append(gid) VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}") # 2. 停止 for gid in affected_groups: self.stop_group(gid) # 3. 卸载模块 reload_plugin_module(plugin_name) # 4. 重启 restarted = [] for gid in affected_groups: if self.start_group(gid): restarted.append(gid) return restarted def get_status(self): """获取所有组的状态""" status_list = [] for gid, cfg in self.configs.items(): running = gid in self.coordinators status_list.append({ "id": gid, "plugin": cfg.plugin_config.plugin_name, "running": running, "instances": cfg.target_instances if running else 0, "account_pool": cfg.account_pool }) return status_list