# core/app_manager.py import json import os import threading from typing import Dict, List, Optional # 引入核心组件 from vs_types import GroupConfig, NotFoundError, BizLogicError from gco_wrapper import GCOWrapper from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN from core.plugin_reloader import reload_plugin_module class AppManager: _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init() return cls._instance @staticmethod def Instance(): return AppManager() def _init(self): self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig self.config_file = "config/groups.json" def load_configs(self): """读取并解析配置文件""" if not os.path.exists(self.config_file): raise NotFoundError(message=f'Config file not found: {self.config_file}') with open(self.config_file, 'r', encoding='utf-8') as f: data = json.load(f) for item in data: # JSON -> GroupConfig 转换 grp_cfg = GroupConfig.from_json(item) self.configs[grp_cfg.identifier] = grp_cfg VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.") def start_all(self): """启动所有 enable=True 的组""" for gid, cfg in self.configs.items(): if cfg.enable and gid not in self.executors: self.start_group(gid) def start_group(self, group_id: str): with self._lock: if group_id not in self.configs: raise NotFoundError(message=f"Group {group_id} not found in config") if group_id in self.executors: VSC_WARN("app_mgr", f"Group {group_id} is already running") return cfg = self.configs[group_id] gco = GCOWrapper(cfg) gco.load() gco.start() self.executors[group_id] = gco VSC_INFO("app_mgr", f"Started group: {group_id}") def stop_group(self, group_id: str): with self._lock: if group_id not in self.executors: raise NotFoundError(message=f"Group {group_id} not found in config") VSC_INFO("app_mgr", f"Stopping group: {group_id}...") self.executors[group_id].stop() del self.executors[group_id] VSC_INFO("app_mgr", f"Stopped group: {group_id}") def restart_group(self, group_id: str): self.stop_group(group_id) self.start_group(group_id) def ota_upgrade_plugin(self, plugin_name: str) -> List[str]: """ OTA 热更新流程: 1. 找到所有使用该插件的运行中组 2. 停止这些组 3. 清除 Python 模块缓存 (热重载) 4. 重新启动这些组 """ affected_groups = [] with self._lock: # 1. 查找受影响的组 for gid, exec in self.executors.items(): # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter # 假设 config 存储在 m_cfg if exec.m_cfg.plugin_config.plugin_name == plugin_name: affected_groups.append(gid) VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}") # 2. 停止 for gid in affected_groups: self.stop_group(gid) # 3. 卸载模块 reload_plugin_module(plugin_name) # 4. 重启 restarted = [] for gid in affected_groups: if self.start_group(gid): restarted.append(gid) return restarted def get_group_config(self, group_id: str): with self._lock: if group_id not in self.configs: return False group_config = self.configs.get(group_id) return group_config.to_json() # ----------------- 更新配置(局部) ----------------- def ota_update_plugin_config(self, group_id: str, new_config_str: str): """更新某个 Executor 的配置(只影响单个 Executor)""" with self._lock: new_config = GroupConfig.from_json(json.loads(new_config_str)) old_exe = self.executors.get(group_id) if not old_exe: # Executor 没运行,直接创建启动 self.configs[group_id] = new_config exe = GCOWrapper(new_config) exe.load() exe.start() self.executors[group_id] = exe return # 创建新 Executor(插件不变,只更新配置) new_exe = GCOWrapper(new_config) new_exe.load() new_exe.start() old_exe.stop() self.executors[group_id] = new_exe VSC_INFO("app_mgr", f"Config update applied for {group_id}") def get_status(self): """获取所有组的状态""" with self._lock: status_list = [] for gid, cfg in self.configs.items(): running = gid in self.executors status_list.append({ "id": gid, "plugin": cfg.plugin_config.plugin_name, "running": running, "instances": cfg.target_instances if running else 0, "local_account_pool": cfg.local_account_pool, "proxies_pool": cfg.proxy_pool }) return status_list def subscribe_executor_logs(self, group_id: str, callback): """ 订阅某个 Executor 的日志 callback: Callable[[str], None] """ with self._lock: if group_id not in self.executors: raise ValueError(f"Executor {group_id} not running") exe = self.executors[group_id] exe.subscribe_logs(callback) def unsubscribe_executor_logs(self, group_id: str, callback): """ 取消订阅日志 """ with self._lock: exe = self.executors.get(group_id) if not exe: return exe.unsubscribe_logs(callback)