|
|
@@ -32,10 +32,10 @@ class OrderBookerGCO:
|
|
|
self.m_pending_order_by_queue: Dict[str, int] = {}
|
|
|
|
|
|
self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
|
|
|
- # 1. 队列级退避:失败起步5分钟,封顶1小时
|
|
|
- self.queue_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=1*60*60.0, factor=2.0)
|
|
|
- # 2. 账号级退避:失败起步30分钟,封顶3小时
|
|
|
- self.account_backoff = ExponentialBackoff(base_delay=30*60.0, max_delay=3*60*60.0, factor=2.0)
|
|
|
+ # 1. 队列级退避:失败起步1分钟,封顶10分钟
|
|
|
+ self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
|
|
|
+ # 2. 账号级退避:失败起步5分钟,封顶2小时
|
|
|
+ self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
|
|
|
|
|
|
def _log(self, message):
|
|
|
if self.m_logger:
|
|
|
@@ -169,7 +169,6 @@ class OrderBookerGCO:
|
|
|
def _execute_book_job(self, task: Task, query_result: VSQueryResult):
|
|
|
task_id = task.task_ref
|
|
|
task_data = None
|
|
|
- booking_success = False
|
|
|
|
|
|
try:
|
|
|
task_data = VSCloudApi.Instance().get_vas_task(task_id)
|
|
|
@@ -187,7 +186,6 @@ class OrderBookerGCO:
|
|
|
book_res = task.instance.book(query_result, user_input)
|
|
|
|
|
|
if book_res.success:
|
|
|
- booking_success = True
|
|
|
self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
|
|
|
grab_info = {
|
|
|
"account": book_res.account,
|
|
|
@@ -351,17 +349,15 @@ class OrderBookerGCO:
|
|
|
self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
|
|
|
self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
|
|
|
if task_id is not None:
|
|
|
- task_meta = task_data.get('meta', {})
|
|
|
+ task_meta = task_data.get('meta') or {}
|
|
|
t_fails = task_meta.get('spawn_failures', 0) + 1
|
|
|
task_meta['spawn_failures'] = t_fails
|
|
|
|
|
|
- # 立即持久化到云端
|
|
|
try:
|
|
|
VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
|
|
|
except Exception as cloud_err:
|
|
|
self._log(f"Failed to update task meta: {cloud_err}")
|
|
|
|
|
|
- # 使用工具计算该任务账号应该挂起的惩罚时间
|
|
|
t_cd = self.account_backoff.calculate(t_fails)
|
|
|
self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
|
|
|
@@ -372,7 +368,7 @@ class OrderBookerGCO:
|
|
|
t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Spawn failed: rate limited (fails={t_fails})"), daemon=True)
|
|
|
t.start()
|
|
|
|
|
|
- task_id = None # 置空防秒归还
|
|
|
+ task_id = None
|
|
|
|
|
|
finally:
|
|
|
with self.m_lock:
|