| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- # core/app_manager.py
- import json
- import os
- import threading
- from typing import Dict, List, Optional
- # 引入核心组件
- from vs_types import GroupConfig, QueryWaitConfig, PluginConfig, QueryWaitMode
- from group_coordinator import GroupCoordinator
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
- from core.plugin_reloader import reload_plugin_module
- class AppManager:
- _instance = None
- _lock = threading.Lock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._init()
- return cls._instance
- @staticmethod
- def Instance():
- return AppManager()
- def _init(self):
- self.coordinators: Dict[str, GroupCoordinator] = {}
- self.configs: Dict[str, GroupConfig] = {}
- self.config_file = "config/groups.json"
- def load_configs(self):
- """读取并解析配置文件"""
- if not os.path.exists(self.config_file):
- VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}")
- return
- try:
- with open(self.config_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
-
- for item in data:
- # JSON -> GroupConfig 转换
- qw_data = item.get("query_wait", {})
- qw_cfg = QueryWaitConfig(
- mode=QueryWaitMode(qw_data.get("mode", 0)),
- fixed_wait=qw_data.get("fixed_wait", 0),
- random_min=qw_data.get("random_min", 0),
- random_max=qw_data.get("random_max", 0)
- )
- plg_data = item.get("plugin_config", {})
- plg_cfg = PluginConfig(
- lib_path=plg_data.get("lib_path", "plugins"),
- plugin_name=plg_data.get("plugin_name", ""),
- plugin_bin=plg_data.get("plugin_bin", ""),
- plugin_proto=plg_data.get("plugin_proto", "IVSPlg")
- )
- grp_cfg = GroupConfig(
- identifier=item["identifier"],
- enable=item.get("enable", False),
- need_account=item.get("need_account", False),
- account_pool=item.get("account_pool", ""),
- need_proxy=item.get("need_proxy", False),
- proxy_pool=item.get("proxy_pool", ""),
- target_instances=item.get("target_instances", 1),
- account_login_interval=item.get("account_login_interval", 0),
- query_wait=qw_cfg,
- plugin_config=plg_cfg,
- free_config=json.dumps(item.get("free_config", "{}"))
- )
-
- self.configs[grp_cfg.identifier] = grp_cfg
-
- VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
-
- except Exception as e:
- VSC_ERROR("app_mgr", f"Failed to load configs: {e}")
- def start_all(self):
- """启动所有 enable=True 的组"""
- for gid, cfg in self.configs.items():
- if cfg.enable and gid not in self.coordinators:
- self.start_group(gid)
- def start_group(self, group_id: str) -> bool:
- if group_id not in self.configs:
- VSC_ERROR("app_mgr", f"Group {group_id} not found in config")
- return False
-
- if group_id in self.coordinators:
- VSC_WARN("app_mgr", f"Group {group_id} is already running")
- return True
- cfg = self.configs[group_id]
- try:
- coord = GroupCoordinator(cfg)
- # 设置推送回调,这里可以连接到 WebSocket
- coord.set_push_callback(lambda t, d, s: print(f"[{group_id} PUSH] {d.decode()}"))
-
- coord.start()
- self.coordinators[group_id] = coord
- VSC_INFO("app_mgr", f"Started group: {group_id}")
- return True
- except Exception as e:
- VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}")
- return False
- def stop_group(self, group_id: str) -> bool:
- if group_id not in self.coordinators:
- return False
-
- VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
- try:
- self.coordinators[group_id].stop()
- del self.coordinators[group_id]
- VSC_INFO("app_mgr", f"Stopped group: {group_id}")
- return True
- except Exception as e:
- VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}")
- return False
- def restart_group(self, group_id: str) -> bool:
- self.stop_group(group_id)
- return self.start_group(group_id)
- def ota_update_plugin(self, plugin_name: str) -> List[str]:
- """
- OTA 热更新流程:
- 1. 找到所有使用该插件的运行中组
- 2. 停止这些组
- 3. 清除 Python 模块缓存 (热重载)
- 4. 重新启动这些组
- """
- affected_groups = []
-
- # 1. 查找受影响的组
- for gid, coord in self.coordinators.items():
- # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
- # 假设 config 存储在 m_cfg
- if coord.m_cfg.plugin_config.plugin_name == plugin_name:
- affected_groups.append(gid)
-
- VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
-
- # 2. 停止
- for gid in affected_groups:
- self.stop_group(gid)
-
- # 3. 卸载模块
- reload_plugin_module(plugin_name)
-
- # 4. 重启
- restarted = []
- for gid in affected_groups:
- if self.start_group(gid):
- restarted.append(gid)
-
- return restarted
- def get_status(self):
- """获取所有组的状态"""
- status_list = []
- for gid, cfg in self.configs.items():
- running = gid in self.coordinators
- status_list.append({
- "id": gid,
- "plugin": cfg.plugin_config.plugin_name,
- "running": running,
- "instances": cfg.target_instances if running else 0,
- "account_pool": cfg.account_pool
- })
- return status_list
|