app_manager.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # core/app_manager.py
  2. import json
  3. import os
  4. import threading
  5. from typing import Dict, List, Optional
  6. # 引入核心组件
  7. from vs_types import GroupConfig, NotFoundError, BizLogicError
  8. from gco_wrapper import GCOWrapper
  9. from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
  10. from core.plugin_reloader import reload_plugin_module
  11. class AppManager:
  12. _instance = None
  13. _lock = threading.Lock()
  14. def __new__(cls):
  15. with cls._lock:
  16. if cls._instance is None:
  17. cls._instance = super().__new__(cls)
  18. cls._instance._init()
  19. return cls._instance
  20. @staticmethod
  21. def Instance():
  22. return AppManager()
  23. def _init(self):
  24. self.executors: Dict[str, GCOWrapper] = {} # group_id -> GCOWrapper
  25. self.configs: Dict[str, GroupConfig] = {} # group_id -> GroupConfig
  26. self.config_file = "config/groups.json"
  27. def load_configs(self):
  28. """读取并解析配置文件"""
  29. if not os.path.exists(self.config_file):
  30. raise NotFoundError(message=f'Config file not found: {self.config_file}')
  31. with open(self.config_file, 'r', encoding='utf-8') as f:
  32. data = json.load(f)
  33. for item in data:
  34. # JSON -> GroupConfig 转换
  35. grp_cfg = GroupConfig.from_json(item)
  36. print(grp_cfg.free_config)
  37. self.configs[grp_cfg.identifier] = grp_cfg
  38. VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
  39. def start_all(self):
  40. """启动所有 enable=True 的组"""
  41. for gid, cfg in self.configs.items():
  42. if cfg.enable and gid not in self.executors:
  43. self.start_group(gid)
  44. def start_group(self, group_id: str):
  45. with self._lock:
  46. if group_id not in self.configs:
  47. raise NotFoundError(message=f"Group {group_id} not found in config")
  48. if group_id in self.executors:
  49. VSC_WARN("app_mgr", f"Group {group_id} is already running")
  50. return
  51. cfg = self.configs[group_id]
  52. gco = GCOWrapper(cfg)
  53. gco.load()
  54. gco.start()
  55. self.executors[group_id] = gco
  56. VSC_INFO("app_mgr", f"Started group: {group_id}")
  57. def stop_group(self, group_id: str):
  58. with self._lock:
  59. if group_id not in self.executors:
  60. raise NotFoundError(message=f"Group {group_id} not found in config")
  61. VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
  62. self.executors[group_id].stop()
  63. del self.executors[group_id]
  64. VSC_INFO("app_mgr", f"Stopped group: {group_id}")
  65. def restart_group(self, group_id: str):
  66. self.stop_group(group_id)
  67. self.start_group(group_id)
  68. def ota_upgrade_plugin(self, plugin_name: str) -> List[str]:
  69. """
  70. OTA 热更新流程:
  71. 1. 找到所有使用该插件的运行中组
  72. 2. 停止这些组
  73. 3. 清除 Python 模块缓存 (热重载)
  74. 4. 重新启动这些组
  75. """
  76. affected_groups = []
  77. with self._lock:
  78. # 1. 查找受影响的组
  79. for gid, exec in self.executors.items():
  80. # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
  81. # 假设 config 存储在 m_cfg
  82. if exec.m_cfg.plugin_config.plugin_name == plugin_name:
  83. affected_groups.append(gid)
  84. VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
  85. # 2. 停止
  86. for gid in affected_groups:
  87. self.stop_group(gid)
  88. # 3. 卸载模块
  89. reload_plugin_module(plugin_name)
  90. # 4. 重启
  91. restarted = []
  92. for gid in affected_groups:
  93. if self.start_group(gid):
  94. restarted.append(gid)
  95. return restarted
  96. def get_group_config(self, group_id: str):
  97. with self._lock:
  98. if group_id not in self.configs:
  99. return False
  100. group_config = self.configs.get(group_id)
  101. return group_config.to_json()
  102. # ----------------- 更新配置(局部) -----------------
  103. def ota_update_plugin_config(self, group_id: str, new_config_str: str):
  104. """更新某个 Executor 的配置(只影响单个 Executor)"""
  105. with self._lock:
  106. new_config = GroupConfig.from_json(json.loads(new_config_str))
  107. old_exe = self.executors.get(group_id)
  108. if not old_exe:
  109. # Executor 没运行,直接创建启动
  110. self.configs[group_id] = new_config
  111. exe = GCOWrapper(new_config)
  112. exe.load()
  113. exe.start()
  114. self.executors[group_id] = exe
  115. return
  116. # 创建新 Executor(插件不变,只更新配置)
  117. new_exe = GCOWrapper(new_config)
  118. new_exe.load()
  119. new_exe.start()
  120. old_exe.stop()
  121. self.executors[group_id] = new_exe
  122. VSC_INFO("app_mgr", f"Config update applied for {group_id}")
  123. def get_status(self):
  124. """获取所有组的状态"""
  125. with self._lock:
  126. status_list = []
  127. for gid, cfg in self.configs.items():
  128. running = gid in self.executors
  129. status_list.append({
  130. "id": gid,
  131. "plugin": cfg.plugin_config.plugin_name,
  132. "running": running,
  133. "instances": cfg.target_instances if running else 0,
  134. "local_account_pool": cfg.local_account_pool,
  135. "proxies_pool": cfg.proxy_pool
  136. })
  137. return status_list
  138. def subscribe_executor_logs(self, group_id: str, callback):
  139. """
  140. 订阅某个 Executor 的日志
  141. callback: Callable[[str], None]
  142. """
  143. with self._lock:
  144. if group_id not in self.executors:
  145. raise ValueError(f"Executor {group_id} not running")
  146. exe = self.executors[group_id]
  147. exe.subscribe_logs(callback)
  148. def unsubscribe_executor_logs(self, group_id: str, callback):
  149. """
  150. 取消订阅日志
  151. """
  152. with self._lock:
  153. exe = self.executors.get(group_id)
  154. if not exe:
  155. return
  156. exe.unsubscribe_logs(callback)