| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- import threading
- import collections
- from enum import Enum, auto
- from typing import Callable
- from gco import GCO
- from vs_types import GroupConfig
- from vs_log_macros import VSC_INFO
- class State(Enum):
- CREATED = auto()
- LOADED = auto()
- RUNNING = auto()
- STOPPING = auto()
- STOPPED = auto()
- FAILED = auto()
- class GCOWrapper:
- def __init__(self, gco_cfg: GroupConfig):
- self.m_cfg = gco_cfg
- self._gco = None
- self._state = State.CREATED
- self._log_queue = collections.deque(maxlen=1000)
- self._log_subscribers = set()
- self._lock = threading.Lock()
- self._push_callback: Callable = lambda t,d,s: None
-
- def set_push_callback(self, callback: Callable):
- self._push_callback = callback
-
- def _log(self, msg: str):
- with self._lock:
- VSC_INFO("-", msg)
- self._log_queue.append(msg)
- for sub in self._log_subscribers:
- try:
- sub(msg)
- except Exception:
- pass
- # 原有 push callback
- self._push_callback("LOG", msg.encode(), self._state)
-
- def get_logs(self, last_n=50):
- with self._lock:
- return list(self._log_queue)[-last_n:]
- def subscribe_logs(self, callback: Callable):
- with self._lock:
- self._log_subscribers.add(callback)
- def unsubscribe_logs(self, callback: Callable):
- with self._lock:
- self._log_subscribers.discard(callback)
- def _transition(self, from_state, to_state):
- if self._state != from_state:
- raise Exception(f"invalid transition {self._state} -> {to_state}")
- print(f"[STATE] {self._state.name} -> {to_state.name}")
- self._state = to_state
- def load(self):
- self._transition(State.CREATED, State.LOADED)
- def start(self):
- self._transition(State.LOADED, State.RUNNING)
- try:
- self._gco = GCO(self.m_cfg, logger=self._log)
- self._gco.start() # 真正的 start 工作
- except Exception:
- self._state = State.FAILED
- raise
- def stop(self):
- self._transition(State.RUNNING, State.STOPPING)
- try:
- self._gco.stop() # 原逻辑 stop
- finally:
- self._transition(State.STOPPING, State.STOPPED)
|