gco_wrapper.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import threading
  2. import collections
  3. from enum import Enum, auto
  4. from typing import Callable
  5. from gco import GCO
  6. from vs_types import GroupConfig
  7. from vs_log_macros import VSC_INFO
  8. class State(Enum):
  9. CREATED = auto()
  10. LOADED = auto()
  11. RUNNING = auto()
  12. STOPPING = auto()
  13. STOPPED = auto()
  14. FAILED = auto()
  15. class GCOWrapper:
  16. def __init__(self, gco_cfg: GroupConfig):
  17. self.m_cfg = gco_cfg
  18. self._gco = None
  19. self._state = State.CREATED
  20. self._log_queue = collections.deque(maxlen=1000)
  21. self._log_subscribers = set()
  22. self._lock = threading.Lock()
  23. self._push_callback: Callable = lambda t,d,s: None
  24. def set_push_callback(self, callback: Callable):
  25. self._push_callback = callback
  26. def _log(self, msg: str):
  27. with self._lock:
  28. VSC_INFO("-", msg)
  29. self._log_queue.append(msg)
  30. for sub in self._log_subscribers:
  31. try:
  32. sub(msg)
  33. except Exception:
  34. pass
  35. # 原有 push callback
  36. self._push_callback("LOG", msg.encode(), self._state)
  37. def get_logs(self, last_n=50):
  38. with self._lock:
  39. return list(self._log_queue)[-last_n:]
  40. def subscribe_logs(self, callback: Callable):
  41. with self._lock:
  42. self._log_subscribers.add(callback)
  43. def unsubscribe_logs(self, callback: Callable):
  44. with self._lock:
  45. self._log_subscribers.discard(callback)
  46. def _transition(self, from_state, to_state):
  47. if self._state != from_state:
  48. raise Exception(f"invalid transition {self._state} -> {to_state}")
  49. print(f"[STATE] {self._state.name} -> {to_state.name}")
  50. self._state = to_state
  51. def load(self):
  52. self._transition(State.CREATED, State.LOADED)
  53. def start(self):
  54. self._transition(State.LOADED, State.RUNNING)
  55. try:
  56. self._gco = GCO(self.m_cfg, logger=self._log)
  57. self._gco.start() # 真正的 start 工作
  58. except Exception:
  59. self._state = State.FAILED
  60. raise
  61. def stop(self):
  62. self._transition(State.RUNNING, State.STOPPING)
  63. try:
  64. self._gco.stop() # 原逻辑 stop
  65. finally:
  66. self._transition(State.STOPPING, State.STOPPED)