app_manager.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # core/app_manager.py
  2. import json
  3. import os
  4. import threading
  5. from typing import Dict, List, Optional
  6. # 引入核心组件
  7. from vs_types import GroupConfig
  8. from gco_wrapper import GCOWrapper
  9. from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
  10. from core.plugin_reloader import reload_plugin_module
  11. class AppManager:
  12. _instance = None
  13. _lock = threading.Lock()
  14. def __new__(cls):
  15. with cls._lock:
  16. if cls._instance is None:
  17. cls._instance = super().__new__(cls)
  18. cls._instance._init()
  19. return cls._instance
  20. @staticmethod
  21. def Instance():
  22. return AppManager()
  23. def _init(self):
  24. self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper
  25. self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig
  26. self.config_file = "config/groups.json"
  27. def load_configs(self):
  28. """读取并解析配置文件"""
  29. if not os.path.exists(self.config_file):
  30. VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}")
  31. return
  32. try:
  33. with open(self.config_file, 'r', encoding='utf-8') as f:
  34. data = json.load(f)
  35. for item in data:
  36. # JSON -> GroupConfig 转换
  37. grp_cfg = GroupConfig.from_json(item)
  38. print(grp_cfg.free_config)
  39. self.configs[grp_cfg.identifier] = grp_cfg
  40. VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
  41. except Exception as e:
  42. VSC_ERROR("app_mgr", f"Failed to load configs: {e}")
  43. def start_all(self):
  44. """启动所有 enable=True 的组"""
  45. for gid, cfg in self.configs.items():
  46. if cfg.enable and gid not in self.executors:
  47. self.start_group(gid)
  48. def start_group(self, group_id: str) -> bool:
  49. with self._lock:
  50. if group_id not in self.configs:
  51. VSC_ERROR("app_mgr", f"Group {group_id} not found in config")
  52. return False
  53. if group_id in self.executors:
  54. VSC_WARN("app_mgr", f"Group {group_id} is already running")
  55. return True
  56. cfg = self.configs[group_id]
  57. try:
  58. gco = GCOWrapper(cfg)
  59. gco.load()
  60. gco.start()
  61. self.executors[group_id] = gco
  62. VSC_INFO("app_mgr", f"Started group: {group_id}")
  63. return True
  64. except Exception as e:
  65. VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}")
  66. return False
  67. def stop_group(self, group_id: str) -> bool:
  68. with self._lock:
  69. if group_id not in self.executors:
  70. return False
  71. VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
  72. try:
  73. self.executors[group_id].stop()
  74. del self.executors[group_id]
  75. VSC_INFO("app_mgr", f"Stopped group: {group_id}")
  76. return True
  77. except Exception as e:
  78. VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}")
  79. return False
  80. def restart_group(self, group_id: str) -> bool:
  81. self.stop_group(group_id)
  82. return self.start_group(group_id)
  83. def ota_upgrade_plugin(self, plugin_name: str) -> List[str]:
  84. """
  85. OTA 热更新流程:
  86. 1. 找到所有使用该插件的运行中组
  87. 2. 停止这些组
  88. 3. 清除 Python 模块缓存 (热重载)
  89. 4. 重新启动这些组
  90. """
  91. affected_groups = []
  92. # 1. 查找受影响的组
  93. for gid, exec in self.executors.items():
  94. # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
  95. # 假设 config 存储在 m_cfg
  96. if exec.m_cfg.plugin_config.plugin_name == plugin_name:
  97. affected_groups.append(gid)
  98. VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
  99. # 2. 停止
  100. for gid in affected_groups:
  101. self.stop_group(gid)
  102. # 3. 卸载模块
  103. reload_plugin_module(plugin_name)
  104. # 4. 重启
  105. restarted = []
  106. for gid in affected_groups:
  107. if self.start_group(gid):
  108. restarted.append(gid)
  109. return restarted
  110. def get_group_config(self, group_id: str):
  111. with self._lock:
  112. if group_id not in self.configs:
  113. return False
  114. group_config = self.configs.get(group_id)
  115. return group_config.to_json()
  116. # ----------------- 更新配置(局部) -----------------
  117. def ota_update_plugin_config(self, group_id: str, new_config_str: str) -> bool:
  118. """更新某个 Executor 的配置(只影响单个 Executor)"""
  119. new_config = GroupConfig.from_json(json.loads(new_config_str))
  120. with self._lock:
  121. old_exe = self.executors.get(group_id)
  122. if not old_exe:
  123. # Executor 没运行,直接创建启动
  124. exe = GCOWrapper(new_config)
  125. exe.load()
  126. exe.start()
  127. self.executors[group_id] = exe
  128. return True
  129. # 创建新 Executor(插件不变,只更新配置)
  130. new_exe = GCOWrapper(new_config)
  131. new_exe.load()
  132. try:
  133. new_exe.start()
  134. old_exe.stop()
  135. self.executors[group_id] = new_exe
  136. VSC_INFO("app_mgr", f"Config update applied for {group_id}")
  137. return True
  138. except Exception as e:
  139. VSC_ERROR("app_mgr", f"Failed to update config for {group_id}: {e}")
  140. return False
  141. def get_status(self):
  142. """获取所有组的状态"""
  143. status_list = []
  144. for gid, cfg in self.configs.items():
  145. running = gid in self.executors
  146. status_list.append({
  147. "id": gid,
  148. "plugin": cfg.plugin_config.plugin_name,
  149. "running": running,
  150. "instances": cfg.target_instances if running else 0,
  151. "account_pool": cfg.account_pool
  152. })
  153. return status_list
  154. def subscribe_executor_logs(self, group_id: str, callback):
  155. """
  156. 订阅某个 Executor 的日志
  157. callback: Callable[[str], None]
  158. """
  159. if group_id not in self.executors:
  160. raise ValueError(f"Executor {group_id} not running")
  161. exe = self.executors[group_id]
  162. exe.subscribe_logs(callback)
  163. def unsubscribe_executor_logs(self, group_id: str, callback):
  164. """
  165. 取消订阅日志
  166. """
  167. exe = self.executors.get(group_id)
  168. if not exe:
  169. return
  170. exe.unsubscribe_logs(callback)