| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import threading
- import collections
- from enum import Enum, auto
- from typing import Callable, Type, Any, Dict
- from vs_types import GroupConfig
- from vs_log_macros import VSC_INFO
- class State(Enum):
- CREATED = auto()
- LOADED = auto()
- RUNNING = auto()
- STOPPING = auto()
- STOPPED = auto()
- FAILED = auto()
- class GCOWrapper:
- def __init__(self, gco_class: Type[Any], gco_cfg: GroupConfig, redis_conf: Dict[str, Any]):
- """
- @param gco_class: 传入你要包装的具体类 (SentinelGCO 或 BookerGCO)
- @param gco_cfg: 分组配置
- @param redis_conf: Redis 连接配置 (字典)
- """
- self.gco_class = gco_class
- self.m_cfg = gco_cfg
- self.redis_conf = redis_conf
-
- self._gco = None
- self._state = State.CREATED
- self._log_queue = collections.deque(maxlen=1000)
- self._log_subscribers = set()
- self._lock = threading.Lock()
- self._push_callback: Callable = lambda t, d, s: None
-
- def set_push_callback(self, callback: Callable):
- self._push_callback = callback
-
- def _log(self, msg: str):
- with self._lock:
- VSC_INFO("-", msg)
- self._log_queue.append(msg)
- for sub in self._log_subscribers:
- try:
- sub(msg)
- except Exception:
- pass
- # 原有 push callback
- self._push_callback("LOG", msg.encode(), self._state)
-
- def get_logs(self, last_n=50):
- with self._lock:
- return list(self._log_queue)[-last_n:]
- def subscribe_logs(self, callback: Callable):
- with self._lock:
- self._log_subscribers.add(callback)
- def unsubscribe_logs(self, callback: Callable):
- with self._lock:
- self._log_subscribers.discard(callback)
- def _transition(self, from_state, to_state):
- if self._state != from_state:
- raise Exception(f"invalid transition {self._state.name} -> {to_state.name}")
- self._state = to_state
- def load(self):
- self._transition(State.CREATED, State.LOADED)
- def start(self):
- self._transition(State.LOADED, State.RUNNING)
- try:
- # 动态实例化传入的类,并传入 Redis 配置
- self._gco = self.gco_class(self.m_cfg, self.redis_conf, logger=self._log)
- self._gco.start() # 调用 Sentinel 或 Booker 的 start
- except Exception:
- self._state = State.FAILED
- raise
- def stop(self):
- if self._state != State.RUNNING:
- return
- self._transition(State.RUNNING, State.STOPPING)
- try:
- if self._gco:
- self._gco.stop() # 调用 Sentinel 或 Booker 的 stop
- finally:
- self._transition(State.STOPPING, State.STOPPED)
|