jerry 2 tygodni temu
rodzic
commit
1943a72fa3
2 zmienionych plików z 30 dodań i 23 usunięć
  1. 11 9
      booker_builtin.py
  2. 19 14
      booker_order.py

+ 11 - 9
booker_builtin.py

@@ -30,10 +30,9 @@ class BuiltinBookerGCO:
         self.m_pending_builtin = 0
         
         self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
-        # 组级别退避 (控制 spawn 频率):起步 1 分钟,封顶 10 分钟
         self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=10*60.0, factor=2.0)
-        # 订单级别退避 (控制特定单据被风控的重试频率):起步 5 分钟,封顶 2 小时
         self.task_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
+        self.m_last_spawn_time = 0.0
 
     def _log(self, message):
         if self.m_logger:
@@ -248,17 +247,20 @@ class BuiltinBookerGCO:
 
     def _creator_loop(self):
         self._log("Creator loop started.")
+        spawn_interval = 10.0
         group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
-        
         while not self.m_stop_event.is_set():
-            time.sleep(5.0)
+            time.sleep(2.0)
+            if self.redis_client.exists(group_cd_key):
+                continue
             with self.m_lock:
-                
-                if self.redis_client.exists(group_cd_key):
-                    continue
-                
                 current = len(self.m_tasks)
-                if (current + self.m_pending_builtin) < self.m_cfg.booker.target_instances:
+                pending = self.m_pending_builtin
+                target = self.m_cfg.booker.target_instances
+            if (current + pending) < target:
+                now = time.time()
+                if now - self.m_last_spawn_time >= spawn_interval:
+                    self.m_last_spawn_time = now 
                     self._spawn_worker()
 
     def _spawn_worker(self):

+ 19 - 14
booker_order.py

@@ -31,10 +31,9 @@ class OrderBookerGCO:
         self.m_pending_order_by_queue: Dict[str, int] = {}
         
         self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
-        # 1. 队列级退避:失败起步1分钟,封顶10分钟
         self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
-        # 2. 账号级退避:失败起步5分钟,封顶2小时
         self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
+        self.m_last_spawn_time = 0.0
 
     def _log(self, message):
         if self.m_logger:
@@ -249,22 +248,28 @@ class OrderBookerGCO:
 
     def _creator_loop(self):
         self._log("Creator loop started.")
+        spawn_interval = 10.0
         while not self.m_stop_event.is_set():
-            time.sleep(5.0)
-            with self.m_lock:
-                for apt in self.m_cfg.appointment_types:
-                    r_key = apt.routing_key
-                    
-                    queue_cd_key = f"vs:queue:cooldown:{r_key}"
-                    if self.redis_client.exists(queue_cd_key):
-                        continue # 仍在冷却中,跳过取单
-                    
+            time.sleep(2.0)
+            for apt in self.m_cfg.appointment_types:
+                r_key = apt.routing_key
+                
+                queue_cd_key = f"vs:queue:cooldown:{r_key}"
+                if self.redis_client.exists(queue_cd_key):
+                    continue
+                
+                with self.m_lock:
                     active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
                     pending = self.m_pending_order_by_queue.get(r_key, 0)
-                    
-                    if (active + pending) < self.m_cfg.booker.target_instances:
+                    target = self.m_cfg.booker.target_instances
+                
+                if (active + pending) < target:
+                    now = time.time()
+                    if now - self.m_last_spawn_time >= spawn_interval:
+                        self.m_last_spawn_time = now 
+                        self._log(f"Staggered: Spawning booker for [{r_key}]. Next in {spawn_interval}s.")
                         self._spawn_worker(r_key)
-                        time.sleep(0.5)
+                        break
 
     def _spawn_worker(self, target_routing_key: str):
         with self.m_lock: