import threading import collections from enum import Enum, auto from typing import Callable from gco import GCO from vs_types import GroupConfig from vs_log_macros import VSC_INFO class State(Enum): CREATED = auto() LOADED = auto() RUNNING = auto() STOPPING = auto() STOPPED = auto() FAILED = auto() class GCOWrapper: def __init__(self, gco_cfg: GroupConfig): self.m_cfg = gco_cfg self._gco = None self._state = State.CREATED self._log_queue = collections.deque(maxlen=1000) self._log_subscribers = set() self._lock = threading.Lock() self._push_callback: Callable = lambda t,d,s: None def set_push_callback(self, callback: Callable): self._push_callback = callback def _log(self, msg: str): with self._lock: VSC_INFO("-", msg) self._log_queue.append(msg) for sub in self._log_subscribers: try: sub(msg) except Exception: pass # 原有 push callback self._push_callback("LOG", msg.encode(), self._state) def get_logs(self, last_n=50): with self._lock: return list(self._log_queue)[-last_n:] def subscribe_logs(self, callback: Callable): with self._lock: self._log_subscribers.add(callback) def unsubscribe_logs(self, callback: Callable): with self._lock: self._log_subscribers.discard(callback) def _transition(self, from_state, to_state): if self._state != from_state: raise Exception(f"invalid transition {self._state} -> {to_state}") print(f"[STATE] {self._state.name} -> {to_state.name}") self._state = to_state def load(self): self._transition(State.CREATED, State.LOADED) def start(self): self._transition(State.LOADED, State.RUNNING) try: self._gco = GCO(self.m_cfg, logger=self._log) self._gco.start() # 真正的 start 工作 except Exception: self._state = State.FAILED raise def stop(self): self._transition(State.RUNNING, State.STOPPING) try: self._gco.stop() # 原逻辑 stop finally: self._transition(State.STOPPING, State.STOPPED)