# core/app_manager.py import json import os import threading from typing import Dict, List, Optional # 引入核心组件 from vs_types import GroupConfig from gco_wrapper import GCOWrapper from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN from core.plugin_reloader import reload_plugin_module class AppManager: _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init() return cls._instance @staticmethod def Instance(): return AppManager() def _init(self): self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig self.config_file = "config/groups.json" def load_configs(self): """读取并解析配置文件""" if not os.path.exists(self.config_file): VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}") return try: with open(self.config_file, 'r', encoding='utf-8') as f: data = json.load(f) for item in data: # JSON -> GroupConfig 转换 grp_cfg = GroupConfig.from_json(item) print(grp_cfg.free_config) self.configs[grp_cfg.identifier] = grp_cfg VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.") except Exception as e: VSC_ERROR("app_mgr", f"Failed to load configs: {e}") def start_all(self): """启动所有 enable=True 的组""" for gid, cfg in self.configs.items(): if cfg.enable and gid not in self.executors: self.start_group(gid) def start_group(self, group_id: str) -> bool: with self._lock: if group_id not in self.configs: VSC_ERROR("app_mgr", f"Group {group_id} not found in config") return False if group_id in self.executors: VSC_WARN("app_mgr", f"Group {group_id} is already running") return True cfg = self.configs[group_id] try: gco = GCOWrapper(cfg) gco.load() gco.start() self.executors[group_id] = gco VSC_INFO("app_mgr", f"Started group: {group_id}") return True except Exception as e: VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}") return False def stop_group(self, group_id: str) -> bool: with self._lock: if group_id not in self.executors: return False VSC_INFO("app_mgr", f"Stopping group: {group_id}...") try: self.executors[group_id].stop() del self.executors[group_id] VSC_INFO("app_mgr", f"Stopped group: {group_id}") return True except Exception as e: VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}") return False def restart_group(self, group_id: str) -> bool: self.stop_group(group_id) return self.start_group(group_id) def ota_upgrade_plugin(self, plugin_name: str) -> List[str]: """ OTA 热更新流程: 1. 找到所有使用该插件的运行中组 2. 停止这些组 3. 清除 Python 模块缓存 (热重载) 4. 重新启动这些组 """ affected_groups = [] # 1. 查找受影响的组 for gid, exec in self.executors.items(): # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter # 假设 config 存储在 m_cfg if exec.m_cfg.plugin_config.plugin_name == plugin_name: affected_groups.append(gid) VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}") # 2. 停止 for gid in affected_groups: self.stop_group(gid) # 3. 卸载模块 reload_plugin_module(plugin_name) # 4. 重启 restarted = [] for gid in affected_groups: if self.start_group(gid): restarted.append(gid) return restarted def get_group_config(self, group_id: str): with self._lock: if group_id not in self.configs: return False group_config = self.configs.get(group_id) return group_config.to_json() # ----------------- 更新配置(局部) ----------------- def ota_update_plugin_config(self, group_id: str, new_config_str: str) -> bool: """更新某个 Executor 的配置(只影响单个 Executor)""" new_config = GroupConfig.from_json(json.loads(new_config_str)) with self._lock: old_exe = self.executors.get(group_id) if not old_exe: # Executor 没运行,直接创建启动 exe = GCOWrapper(new_config) exe.load() exe.start() self.executors[group_id] = exe return True # 创建新 Executor(插件不变,只更新配置) new_exe = GCOWrapper(new_config) new_exe.load() try: new_exe.start() old_exe.stop() self.executors[group_id] = new_exe VSC_INFO("app_mgr", f"Config update applied for {group_id}") return True except Exception as e: VSC_ERROR("app_mgr", f"Failed to update config for {group_id}: {e}") return False def get_status(self): """获取所有组的状态""" status_list = [] for gid, cfg in self.configs.items(): running = gid in self.executors status_list.append({ "id": gid, "plugin": cfg.plugin_config.plugin_name, "running": running, "instances": cfg.target_instances if running else 0, "account_pool": cfg.account_pool }) return status_list def subscribe_executor_logs(self, group_id: str, callback): """ 订阅某个 Executor 的日志 callback: Callable[[str], None] """ if group_id not in self.executors: raise ValueError(f"Executor {group_id} not running") exe = self.executors[group_id] exe.subscribe_logs(callback) def unsubscribe_executor_logs(self, group_id: str, callback): """ 取消订阅日志 """ exe = self.executors.get(group_id) if not exe: return exe.unsubscribe_logs(callback)