gco_wrapper.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import threading
  2. import collections
  3. from enum import Enum, auto
  4. from typing import Callable, Type, Any, Dict
  5. from vs_types import GroupConfig
  6. from vs_log_macros import VSC_INFO
  7. class State(Enum):
  8. CREATED = auto()
  9. LOADED = auto()
  10. RUNNING = auto()
  11. STOPPING = auto()
  12. STOPPED = auto()
  13. FAILED = auto()
  14. class GCOWrapper:
  15. def __init__(self, gco_class: Type[Any], gco_cfg: GroupConfig, redis_conf: Dict[str, Any]):
  16. """
  17. @param gco_class: 传入你要包装的具体类 (SentinelGCO 或 BookerGCO)
  18. @param gco_cfg: 分组配置
  19. @param redis_conf: Redis 连接配置 (字典)
  20. """
  21. self.gco_class = gco_class
  22. self.m_cfg = gco_cfg
  23. self.redis_conf = redis_conf
  24. self._gco = None
  25. self._state = State.CREATED
  26. self._log_queue = collections.deque(maxlen=1000)
  27. self._log_subscribers = set()
  28. self._lock = threading.Lock()
  29. self._push_callback: Callable = lambda t, d, s: None
  30. def set_push_callback(self, callback: Callable):
  31. self._push_callback = callback
  32. def _log(self, msg: str):
  33. with self._lock:
  34. VSC_INFO("-", msg)
  35. self._log_queue.append(msg)
  36. for sub in self._log_subscribers:
  37. try:
  38. sub(msg)
  39. except Exception:
  40. pass
  41. # 原有 push callback
  42. self._push_callback("LOG", msg.encode(), self._state)
  43. def get_logs(self, last_n=50):
  44. with self._lock:
  45. return list(self._log_queue)[-last_n:]
  46. def subscribe_logs(self, callback: Callable):
  47. with self._lock:
  48. self._log_subscribers.add(callback)
  49. def unsubscribe_logs(self, callback: Callable):
  50. with self._lock:
  51. self._log_subscribers.discard(callback)
  52. def _transition(self, from_state, to_state):
  53. if self._state != from_state:
  54. raise Exception(f"invalid transition {self._state.name} -> {to_state.name}")
  55. self._state = to_state
  56. def load(self):
  57. self._transition(State.CREATED, State.LOADED)
  58. def start(self):
  59. self._transition(State.LOADED, State.RUNNING)
  60. try:
  61. # 动态实例化传入的类,并传入 Redis 配置
  62. self._gco = self.gco_class(self.m_cfg, self.redis_conf, logger=self._log)
  63. self._gco.start() # 调用 Sentinel 或 Booker 的 start
  64. except Exception:
  65. self._state = State.FAILED
  66. raise
  67. def stop(self):
  68. if self._state != State.RUNNING:
  69. return
  70. self._transition(State.RUNNING, State.STOPPING)
  71. try:
  72. if self._gco:
  73. self._gco.stop() # 调用 Sentinel 或 Booker 的 stop
  74. finally:
  75. self._transition(State.STOPPING, State.STOPPED)