|
|
@@ -54,28 +54,28 @@ class OrderBookerGCO:
|
|
|
def _get_redis_key(self, routing_key: str) -> str:
|
|
|
return f"vs:signal:{routing_key}"
|
|
|
|
|
|
- def _safe_return_task(self, task_id: int, reason: str = ""):
|
|
|
+ def _safe_return_task(self, order_id: str, reason: str = ""):
|
|
|
"""安全地将订单归还给云端队列,防止复活已被取消或已抢成功的订单"""
|
|
|
- if not task_id:
|
|
|
+ if not order_id:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
|
|
|
+ task_data = VSCloudApi.Instance().get_vas_task(order_id)
|
|
|
if not task_data:
|
|
|
- self._log(f"Task {task_id} not found in cloud, cannot return.")
|
|
|
+ self._log(f"Task {order_id} not found in cloud, cannot return.")
|
|
|
return
|
|
|
-
|
|
|
+ task_id = task_data['id']
|
|
|
current_status = task_data.get('status', '')
|
|
|
# 如果订单已经被客户取消,或者已经成功,绝对不能还回队列!
|
|
|
if current_status in ['cancelled', 'grabbed', 'success']:
|
|
|
- self._log(f"Task {task_id} is already '{current_status}'. Skipping return.")
|
|
|
+ self._log(f"Task {order_id} is already '{current_status}'. Skipping return.")
|
|
|
return
|
|
|
|
|
|
- self._log(f"Returning task {task_id} to queue. Reason: {reason}")
|
|
|
+ self._log(f"Returning task {order_id} to queue. Reason: {reason}")
|
|
|
VSCloudApi.Instance().return_vas_task_to_queue(task_id)
|
|
|
|
|
|
except Exception as ex:
|
|
|
- self._log(f"Failed to safely return task {task_id}: {ex}")
|
|
|
+ self._log(f"Failed to safely return task for order_id {order_id}: {ex}")
|
|
|
|
|
|
def _maintain_loop(self):
|
|
|
self._log("Maintain loop started.")
|
|
|
@@ -121,12 +121,19 @@ class OrderBookerGCO:
|
|
|
now = time.time()
|
|
|
for apt_type in self.m_cfg.appointment_types:
|
|
|
redis_key = self._get_redis_key(apt_type.routing_key)
|
|
|
- if not self.redis_client.get(redis_key): continue
|
|
|
-
|
|
|
- data = json.loads(self.redis_client.get(redis_key))
|
|
|
- query_result = VSQueryResult.model_validate(data['query_result'])
|
|
|
- query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
|
|
|
+ raw_data = self.redis_client.get(redis_key)
|
|
|
+ if not raw_data:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ data = json.loads(raw_data)
|
|
|
+ query_result = VSQueryResult.model_validate(data['query_result'])
|
|
|
+ query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
|
|
|
+ except Exception as parse_err:
|
|
|
+ self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
|
|
|
+ self.redis_client.delete(redis_key)
|
|
|
+ continue
|
|
|
|
|
|
+ matching_tasks = []
|
|
|
with self.m_lock:
|
|
|
for task in self.m_tasks:
|
|
|
if now < task.next_run or not task.book_allowed:
|
|
|
@@ -134,27 +141,38 @@ class OrderBookerGCO:
|
|
|
if apt_type.routing_key not in task.acceptable_routing_keys:
|
|
|
continue
|
|
|
|
|
|
- self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
|
|
|
task.next_run = now + self.m_cfg.booker.booking_cooldown
|
|
|
- ThreadPool.getInstance().enqueue(self._execute_book_job, task, query_result)
|
|
|
+ matching_tasks.append(task)
|
|
|
+
|
|
|
+ if matching_tasks:
|
|
|
+ threads = []
|
|
|
+ for task in matching_tasks:
|
|
|
+ self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
|
|
|
+ t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
|
|
|
+ threads.append(t)
|
|
|
+ t.start()
|
|
|
+
|
|
|
+ for t in threads:
|
|
|
+ t.join()
|
|
|
+
|
|
|
except Exception as e:
|
|
|
self._log(f"Trigger loop error: {e}")
|
|
|
time.sleep(2)
|
|
|
|
|
|
def _execute_book_job(self, task: Task, query_result: VSQueryResult):
|
|
|
- task_id = task.task_ref
|
|
|
- if not task_id:
|
|
|
+ order_id = task.task_ref
|
|
|
+ if not order_id:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
|
|
|
+ task_data = VSCloudApi.Instance().get_vas_task(order_id)
|
|
|
if not task_data or task_data.get('status') in ['grabbed', 'cancelled']:
|
|
|
- self._log(f"Bound Task {task_id} is no longer valid or already processed. Removing instance.")
|
|
|
+ self._log(f"Bound Task {order_id} is no longer valid or already processed. Removing instance.")
|
|
|
with self.m_lock:
|
|
|
if task in self.m_tasks: self.m_tasks.remove(task)
|
|
|
return
|
|
|
|
|
|
- order_id = task_data.get('order_id')
|
|
|
+ task_id = task_data.get('id')
|
|
|
user_input = task_data.get('user_inputs', {})
|
|
|
|
|
|
book_res = task.instance.book(query_result, user_input)
|
|
|
@@ -201,7 +219,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
def _job():
|
|
|
success = False
|
|
|
- task_id = None
|
|
|
+ order_id = None
|
|
|
try:
|
|
|
queue_name = f"auto.{target_routing_key}"
|
|
|
task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
|
|
|
@@ -209,6 +227,7 @@ class OrderBookerGCO:
|
|
|
return
|
|
|
|
|
|
task_id = task_data['id']
|
|
|
+ order_id = task_data['order_id']
|
|
|
user_inputs = task_data.get('user_inputs', {})
|
|
|
|
|
|
plg_cfg = VSPlgConfig()
|
|
|
@@ -242,9 +261,9 @@ class OrderBookerGCO:
|
|
|
instance=instance,
|
|
|
qw_cfg=self.m_cfg.query_wait,
|
|
|
next_run=time.time(),
|
|
|
- task_ref=task_id,
|
|
|
+ task_ref=order_id,
|
|
|
acceptable_routing_keys=acceptable_keys,
|
|
|
- ource_queue=target_routing_key,
|
|
|
+ source_queue=target_routing_key,
|
|
|
book_allowed=True
|
|
|
)
|
|
|
)
|
|
|
@@ -261,7 +280,7 @@ class OrderBookerGCO:
|
|
|
self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
|
|
|
|
|
|
# 创建/登录失败,调用安全归还函数
|
|
|
- if not success and task_id is not None:
|
|
|
- self._safe_return_task(task_id, reason="Instance spawn/login failed")
|
|
|
+ if not success and order_id is not None:
|
|
|
+ self._safe_return_task(order_id, reason="Instance spawn/login failed")
|
|
|
|
|
|
ThreadPool.getInstance().enqueue(_job)
|