import threading import collections from enum import Enum, auto from typing import Callable, Type, Any, Dict from vs_types import GroupConfig from vs_log_macros import VSC_INFO class State(Enum): CREATED = auto() LOADED = auto() RUNNING = auto() STOPPING = auto() STOPPED = auto() FAILED = auto() class GCOWrapper: def __init__(self, gco_class: Type[Any], gco_cfg: GroupConfig, redis_conf: Dict[str, Any]): """ @param gco_class: 传入你要包装的具体类 (SentinelGCO 或 BookerGCO) @param gco_cfg: 分组配置 @param redis_conf: Redis 连接配置 (字典) """ self.gco_class = gco_class self.m_cfg = gco_cfg self.redis_conf = redis_conf self._gco = None self._state = State.CREATED self._log_queue = collections.deque(maxlen=1000) self._log_subscribers = set() self._lock = threading.Lock() self._push_callback: Callable = lambda t, d, s: None def set_push_callback(self, callback: Callable): self._push_callback = callback def _log(self, msg: str): with self._lock: VSC_INFO("-", msg) self._log_queue.append(msg) for sub in self._log_subscribers: try: sub(msg) except Exception: pass # 原有 push callback self._push_callback("LOG", msg.encode(), self._state) def get_logs(self, last_n=50): with self._lock: return list(self._log_queue)[-last_n:] def subscribe_logs(self, callback: Callable): with self._lock: self._log_subscribers.add(callback) def unsubscribe_logs(self, callback: Callable): with self._lock: self._log_subscribers.discard(callback) def _transition(self, from_state, to_state): if self._state != from_state: raise Exception(f"invalid transition {self._state.name} -> {to_state.name}") self._state = to_state def load(self): self._transition(State.CREATED, State.LOADED) def start(self): self._transition(State.LOADED, State.RUNNING) try: # 动态实例化传入的类,并传入 Redis 配置 self._gco = self.gco_class(self.m_cfg, self.redis_conf, logger=self._log) self._gco.start() # 调用 Sentinel 或 Booker 的 start except Exception: self._state = State.FAILED raise def stop(self): if self._state != State.RUNNING: return self._transition(State.RUNNING, State.STOPPING) try: if self._gco: self._gco.stop() # 调用 Sentinel 或 Booker 的 stop finally: self._transition(State.STOPPING, State.STOPPED)