app_manager.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. # core/app_manager.py
  2. import json
  3. import os
  4. import threading
  5. from typing import Dict, List, Optional
  6. # 引入核心组件
  7. from vs_types import GroupConfig, NotFoundError, BizLogicError
  8. from gco_wrapper import GCOWrapper
  9. from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
  10. from core.plugin_reloader import reload_plugin_module
  11. class AppManager:
  12. _instance = None
  13. _lock = threading.Lock()
  14. def __new__(cls):
  15. with cls._lock:
  16. if cls._instance is None:
  17. cls._instance = super().__new__(cls)
  18. cls._instance._init()
  19. return cls._instance
  20. @staticmethod
  21. def Instance():
  22. return AppManager()
  23. def _init(self):
  24. self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper
  25. self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig
  26. self.config_file = "config/groups.json"
  27. def load_configs(self):
  28. """读取并解析配置文件"""
  29. if not os.path.exists(self.config_file):
  30. raise NotFoundError(message=f'Config file not found: {self.config_file}')
  31. with open(self.config_file, 'r', encoding='utf-8') as f:
  32. data = json.load(f)
  33. for item in data:
  34. # JSON -> GroupConfig 转换
  35. grp_cfg = GroupConfig.from_json(item)
  36. self.configs[grp_cfg.identifier] = grp_cfg
  37. VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
  38. def start_all(self):
  39. """启动所有 enable=True 的组"""
  40. for gid, cfg in self.configs.items():
  41. if cfg.enable and gid not in self.executors:
  42. self.start_group(gid)
  43. def start_group(self, group_id: str):
  44. with self._lock:
  45. if group_id not in self.configs:
  46. raise NotFoundError(message=f"Group {group_id} not found in config")
  47. if group_id in self.executors:
  48. VSC_WARN("app_mgr", f"Group {group_id} is already running")
  49. return
  50. cfg = self.configs[group_id]
  51. gco = GCOWrapper(cfg)
  52. gco.load()
  53. gco.start()
  54. self.executors[group_id] = gco
  55. VSC_INFO("app_mgr", f"Started group: {group_id}")
  56. def stop_group(self, group_id: str):
  57. with self._lock:
  58. if group_id not in self.executors:
  59. raise NotFoundError(message=f"Group {group_id} not found in config")
  60. VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
  61. self.executors[group_id].stop()
  62. del self.executors[group_id]
  63. VSC_INFO("app_mgr", f"Stopped group: {group_id}")
  64. def restart_group(self, group_id: str):
  65. self.stop_group(group_id)
  66. self.start_group(group_id)
  67. def ota_upgrade_plugin(self, plugin_name: str) -> List[str]:
  68. """
  69. OTA 热更新流程:
  70. 1. 找到所有使用该插件的运行中组
  71. 2. 停止这些组
  72. 3. 清除 Python 模块缓存 (热重载)
  73. 4. 重新启动这些组
  74. """
  75. affected_groups = []
  76. with self._lock:
  77. # 1. 查找受影响的组
  78. for gid, exec in self.executors.items():
  79. # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
  80. # 假设 config 存储在 m_cfg
  81. if exec.m_cfg.plugin_config.plugin_name == plugin_name:
  82. affected_groups.append(gid)
  83. VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
  84. # 2. 停止
  85. for gid in affected_groups:
  86. self.stop_group(gid)
  87. # 3. 卸载模块
  88. reload_plugin_module(plugin_name)
  89. # 4. 重启
  90. restarted = []
  91. for gid in affected_groups:
  92. if self.start_group(gid):
  93. restarted.append(gid)
  94. return restarted
  95. def get_group_config(self, group_id: str):
  96. with self._lock:
  97. if group_id not in self.configs:
  98. return False
  99. group_config = self.configs.get(group_id)
  100. return group_config.to_json()
  101. # ----------------- 更新配置(局部) -----------------
  102. def ota_update_plugin_config(self, group_id: str, new_config_str: str):
  103. """更新某个 Executor 的配置(只影响单个 Executor)"""
  104. with self._lock:
  105. new_config = GroupConfig.from_json(json.loads(new_config_str))
  106. old_exe = self.executors.get(group_id)
  107. if not old_exe:
  108. # Executor 没运行,直接创建启动
  109. self.configs[group_id] = new_config
  110. exe = GCOWrapper(new_config)
  111. exe.load()
  112. exe.start()
  113. self.executors[group_id] = exe
  114. return
  115. # 创建新 Executor(插件不变,只更新配置)
  116. new_exe = GCOWrapper(new_config)
  117. new_exe.load()
  118. new_exe.start()
  119. old_exe.stop()
  120. self.executors[group_id] = new_exe
  121. VSC_INFO("app_mgr", f"Config update applied for {group_id}")
  122. def get_status(self):
  123. """获取所有组的状态"""
  124. with self._lock:
  125. status_list = []
  126. for gid, cfg in self.configs.items():
  127. running = gid in self.executors
  128. status_list.append({
  129. "id": gid,
  130. "plugin": cfg.plugin_config.plugin_name,
  131. "running": running,
  132. "instances": cfg.target_instances if running else 0,
  133. "local_account_pool": cfg.local_account_pool,
  134. "proxies_pool": cfg.proxy_pool
  135. })
  136. return status_list
  137. def subscribe_executor_logs(self, group_id: str, callback):
  138. """
  139. 订阅某个 Executor 的日志
  140. callback: Callable[[str], None]
  141. """
  142. with self._lock:
  143. if group_id not in self.executors:
  144. raise ValueError(f"Executor {group_id} not running")
  145. exe = self.executors[group_id]
  146. exe.subscribe_logs(callback)
  147. def unsubscribe_executor_logs(self, group_id: str, callback):
  148. """
  149. 取消订阅日志
  150. """
  151. with self._lock:
  152. exe = self.executors.get(group_id)
  153. if not exe:
  154. return
  155. exe.unsubscribe_logs(callback)