| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # core/app_manager.py
- import json
- import os
- import threading
- from typing import Dict, List, Optional
- # 引入核心组件
- from vs_types import GroupConfig, NotFoundError, BizLogicError
- from gco_wrapper import GCOWrapper
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
- from core.plugin_reloader import reload_plugin_module
- class AppManager:
- _instance = None
- _lock = threading.Lock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._init()
- return cls._instance
- @staticmethod
- def Instance():
- return AppManager()
- def _init(self):
- self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper
- self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig
- self.config_file = "config/groups.json"
- def load_configs(self):
- """读取并解析配置文件"""
- if not os.path.exists(self.config_file):
- raise NotFoundError(message=f'Config file not found: {self.config_file}')
- with open(self.config_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
-
- for item in data:
- # JSON -> GroupConfig 转换
- grp_cfg = GroupConfig.from_json(item)
- self.configs[grp_cfg.identifier] = grp_cfg
-
- VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
- def start_all(self):
- """启动所有 enable=True 的组"""
- for gid, cfg in self.configs.items():
- if cfg.enable and gid not in self.executors:
- self.start_group(gid)
- def start_group(self, group_id: str):
- with self._lock:
- if group_id not in self.configs:
- raise NotFoundError(message=f"Group {group_id} not found in config")
-
- if group_id in self.executors:
- VSC_WARN("app_mgr", f"Group {group_id} is already running")
- return
- cfg = self.configs[group_id]
- gco = GCOWrapper(cfg)
- gco.load()
- gco.start()
- self.executors[group_id] = gco
- VSC_INFO("app_mgr", f"Started group: {group_id}")
- def stop_group(self, group_id: str):
- with self._lock:
- if group_id not in self.executors:
- raise NotFoundError(message=f"Group {group_id} not found in config")
-
- VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
- self.executors[group_id].stop()
- del self.executors[group_id]
- VSC_INFO("app_mgr", f"Stopped group: {group_id}")
- def restart_group(self, group_id: str):
- self.stop_group(group_id)
- self.start_group(group_id)
- def ota_upgrade_plugin(self, plugin_name: str) -> List[str]:
- """
- OTA 热更新流程:
- 1. 找到所有使用该插件的运行中组
- 2. 停止这些组
- 3. 清除 Python 模块缓存 (热重载)
- 4. 重新启动这些组
- """
- affected_groups = []
- with self._lock:
- # 1. 查找受影响的组
- for gid, exec in self.executors.items():
- # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
- # 假设 config 存储在 m_cfg
- if exec.m_cfg.plugin_config.plugin_name == plugin_name:
- affected_groups.append(gid)
-
- VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
-
- # 2. 停止
- for gid in affected_groups:
- self.stop_group(gid)
-
- # 3. 卸载模块
- reload_plugin_module(plugin_name)
-
- # 4. 重启
- restarted = []
- for gid in affected_groups:
- if self.start_group(gid):
- restarted.append(gid)
-
- return restarted
-
- def get_group_config(self, group_id: str):
- with self._lock:
- if group_id not in self.configs:
- return False
- group_config = self.configs.get(group_id)
- return group_config.to_json()
-
- # ----------------- 更新配置(局部) -----------------
- def ota_update_plugin_config(self, group_id: str, new_config_str: str):
- """更新某个 Executor 的配置(只影响单个 Executor)"""
- with self._lock:
- new_config = GroupConfig.from_json(json.loads(new_config_str))
- old_exe = self.executors.get(group_id)
- if not old_exe:
- # Executor 没运行,直接创建启动
- self.configs[group_id] = new_config
- exe = GCOWrapper(new_config)
- exe.load()
- exe.start()
- self.executors[group_id] = exe
- return
- # 创建新 Executor(插件不变,只更新配置)
- new_exe = GCOWrapper(new_config)
- new_exe.load()
- new_exe.start()
- old_exe.stop()
- self.executors[group_id] = new_exe
- VSC_INFO("app_mgr", f"Config update applied for {group_id}")
-
- def get_status(self):
- """获取所有组的状态"""
- with self._lock:
- status_list = []
- for gid, cfg in self.configs.items():
- running = gid in self.executors
- status_list.append({
- "id": gid,
- "plugin": cfg.plugin_config.plugin_name,
- "running": running,
- "instances": cfg.target_instances if running else 0,
- "local_account_pool": cfg.local_account_pool,
- "proxies_pool": cfg.proxy_pool
- })
- return status_list
-
- def subscribe_executor_logs(self, group_id: str, callback):
- """
- 订阅某个 Executor 的日志
- callback: Callable[[str], None]
- """
- with self._lock:
- if group_id not in self.executors:
- raise ValueError(f"Executor {group_id} not running")
- exe = self.executors[group_id]
- exe.subscribe_logs(callback)
- def unsubscribe_executor_logs(self, group_id: str, callback):
- """
- 取消订阅日志
- """
- with self._lock:
- exe = self.executors.get(group_id)
- if not exe:
- return
- exe.unsubscribe_logs(callback)
-
|