app_manager.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # core/app_manager.py
  2. import json
  3. import os
  4. import threading
  5. from typing import Dict, List, Optional
  6. # 引入核心组件
  7. from vs_types import GroupConfig, QueryWaitConfig, PluginConfig, QueryWaitMode
  8. from group_coordinator import GroupCoordinator
  9. from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_WARN
  10. from core.plugin_reloader import reload_plugin_module
  11. class AppManager:
  12. _instance = None
  13. _lock = threading.Lock()
  14. def __new__(cls):
  15. with cls._lock:
  16. if cls._instance is None:
  17. cls._instance = super().__new__(cls)
  18. cls._instance._init()
  19. return cls._instance
  20. @staticmethod
  21. def Instance():
  22. return AppManager()
  23. def _init(self):
  24. self.coordinators: Dict[str, GroupCoordinator] = {}
  25. self.configs: Dict[str, GroupConfig] = {}
  26. self.config_file = "config/groups.json"
  27. def load_configs(self):
  28. """读取并解析配置文件"""
  29. if not os.path.exists(self.config_file):
  30. VSC_ERROR("app_mgr", f"Config file not found: {self.config_file}")
  31. return
  32. try:
  33. with open(self.config_file, 'r', encoding='utf-8') as f:
  34. data = json.load(f)
  35. for item in data:
  36. # JSON -> GroupConfig 转换
  37. qw_data = item.get("query_wait", {})
  38. qw_cfg = QueryWaitConfig(
  39. mode=QueryWaitMode(qw_data.get("mode", 0)),
  40. fixed_wait=qw_data.get("fixed_wait", 0),
  41. random_min=qw_data.get("random_min", 0),
  42. random_max=qw_data.get("random_max", 0)
  43. )
  44. plg_data = item.get("plugin_config", {})
  45. plg_cfg = PluginConfig(
  46. lib_path=plg_data.get("lib_path", "plugins"),
  47. plugin_name=plg_data.get("plugin_name", ""),
  48. plugin_bin=plg_data.get("plugin_bin", ""),
  49. plugin_proto=plg_data.get("plugin_proto", "IVSPlg")
  50. )
  51. grp_cfg = GroupConfig(
  52. identifier=item["identifier"],
  53. debug=item.get("debug", False),
  54. enable=item.get("enable", False),
  55. need_account=item.get("need_account", False),
  56. account_built_in=item.get("account_built_in", True),
  57. account_pool=item.get("account_pool", ""),
  58. need_proxy=item.get("need_proxy", False),
  59. proxy_pool=item.get("proxy_pool", ""),
  60. target_instances=item.get("target_instances", 1),
  61. account_login_interval=item.get("account_login_interval", 0),
  62. query_wait=qw_cfg,
  63. plugin_config=plg_cfg,
  64. free_config=json.dumps(item.get("free_config", "{}"))
  65. )
  66. self.configs[grp_cfg.identifier] = grp_cfg
  67. VSC_INFO("app_mgr", f"Loaded {len(self.configs)} group configurations.")
  68. except Exception as e:
  69. VSC_ERROR("app_mgr", f"Failed to load configs: {e}")
  70. def start_all(self):
  71. """启动所有 enable=True 的组"""
  72. for gid, cfg in self.configs.items():
  73. if cfg.enable and gid not in self.coordinators:
  74. self.start_group(gid)
  75. def start_group(self, group_id: str) -> bool:
  76. if group_id not in self.configs:
  77. VSC_ERROR("app_mgr", f"Group {group_id} not found in config")
  78. return False
  79. if group_id in self.coordinators:
  80. VSC_WARN("app_mgr", f"Group {group_id} is already running")
  81. return True
  82. cfg = self.configs[group_id]
  83. try:
  84. coord = GroupCoordinator(cfg)
  85. # 设置推送回调,这里可以连接到 WebSocket
  86. coord.set_push_callback(lambda t, d, s: print(f"[{group_id} PUSH] {d.decode()}"))
  87. coord.start()
  88. self.coordinators[group_id] = coord
  89. VSC_INFO("app_mgr", f"Started group: {group_id}")
  90. return True
  91. except Exception as e:
  92. VSC_ERROR("app_mgr", f"Failed to start group {group_id}: {e}")
  93. return False
  94. def stop_group(self, group_id: str) -> bool:
  95. if group_id not in self.coordinators:
  96. return False
  97. VSC_INFO("app_mgr", f"Stopping group: {group_id}...")
  98. try:
  99. self.coordinators[group_id].stop()
  100. del self.coordinators[group_id]
  101. VSC_INFO("app_mgr", f"Stopped group: {group_id}")
  102. return True
  103. except Exception as e:
  104. VSC_ERROR("app_mgr", f"Error stopping group {group_id}: {e}")
  105. return False
  106. def restart_group(self, group_id: str) -> bool:
  107. self.stop_group(group_id)
  108. return self.start_group(group_id)
  109. def ota_update_plugin(self, plugin_name: str) -> List[str]:
  110. """
  111. OTA 热更新流程:
  112. 1. 找到所有使用该插件的运行中组
  113. 2. 停止这些组
  114. 3. 清除 Python 模块缓存 (热重载)
  115. 4. 重新启动这些组
  116. """
  117. affected_groups = []
  118. # 1. 查找受影响的组
  119. for gid, coord in self.coordinators.items():
  120. # 注意:这里我们访问 coord 私有成员,实际工程中建议添加 getter
  121. # 假设 config 存储在 m_cfg
  122. if coord.m_cfg.plugin_config.plugin_name == plugin_name:
  123. affected_groups.append(gid)
  124. VSC_INFO("app_mgr", f"OTA Update for '{plugin_name}'. Affected groups: {affected_groups}")
  125. # 2. 停止
  126. for gid in affected_groups:
  127. self.stop_group(gid)
  128. # 3. 卸载模块
  129. reload_plugin_module(plugin_name)
  130. # 4. 重启
  131. restarted = []
  132. for gid in affected_groups:
  133. if self.start_group(gid):
  134. restarted.append(gid)
  135. return restarted
  136. def get_status(self):
  137. """获取所有组的状态"""
  138. status_list = []
  139. for gid, cfg in self.configs.items():
  140. running = gid in self.coordinators
  141. status_list.append({
  142. "id": gid,
  143. "plugin": cfg.plugin_config.plugin_name,
  144. "running": running,
  145. "instances": cfg.target_instances if running else 0,
  146. "account_pool": cfg.account_pool
  147. })
  148. return status_list