|
|
@@ -26,7 +26,8 @@ class OrderBookerGCO:
|
|
|
self.m_tasks: List[Task] = []
|
|
|
self.m_lock = threading.RLock()
|
|
|
self.m_stop_event = threading.Event()
|
|
|
- self.redis_client = redis.Redis(**redis_conf)
|
|
|
+ self.redis_com = redis.Redis(**redis_conf)
|
|
|
+ self.redis_sub = redis.Redis(**redis_conf)
|
|
|
self.m_pending_order_by_queue: Dict[str, int] = {}
|
|
|
self.m_last_spawn_times: Dict[str, float] = {}
|
|
|
self.m_task_data_cache: Dict[str, dict] = {}
|
|
|
@@ -132,7 +133,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
if healthy_tasks:
|
|
|
try:
|
|
|
- pipeline = self.redis_client.pipeline()
|
|
|
+ pipeline = self.redis_com.pipeline()
|
|
|
new_deadline = time.time() + self.heartbeat_ttl
|
|
|
for t in healthy_tasks:
|
|
|
if t.task_ref is not None:
|
|
|
@@ -144,7 +145,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
if dead_tasks:
|
|
|
try:
|
|
|
- pipeline = self.redis_client.pipeline()
|
|
|
+ pipeline = self.redis_com.pipeline()
|
|
|
for t in dead_tasks:
|
|
|
if t.task_ref is not None:
|
|
|
pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
|
|
|
@@ -205,7 +206,7 @@ class OrderBookerGCO:
|
|
|
while not self.m_stop_event.is_set():
|
|
|
try:
|
|
|
if pubsub is None:
|
|
|
- pubsub = self.redis_client.pubsub(ignore_subscribe_messages=False)
|
|
|
+ pubsub = self.redis_sub.pubsub(ignore_subscribe_messages=False)
|
|
|
channels_to_sub = list(channel_to_routing_key.keys())
|
|
|
self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
|
|
|
pubsub.subscribe(*channels_to_sub)
|
|
|
@@ -284,7 +285,7 @@ class OrderBookerGCO:
|
|
|
if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
|
|
|
self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
|
|
|
self._remove_task(task, "bound task no longer valid")
|
|
|
- self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
+ self.redis_com.zrem(self.m_tracker_key, task_id)
|
|
|
return
|
|
|
|
|
|
order_id = task_data.get('order_id')
|
|
|
@@ -323,7 +324,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
ThreadPool.getInstance().enqueue(_update_cloud_success)
|
|
|
|
|
|
- self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
+ self.redis_com.zrem(self.m_tracker_key, task_id)
|
|
|
self._remove_task(task, "booking success")
|
|
|
else:
|
|
|
self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
|
|
|
@@ -350,7 +351,7 @@ class OrderBookerGCO:
|
|
|
ThreadPool.getInstance().enqueue(_update_cloud_meta)
|
|
|
t_cd = self.task_backoff.calculate(t_fails)
|
|
|
self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
+ self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
|
|
|
def _creator_loop(self):
|
|
|
self._log("Creator loop started.")
|
|
|
@@ -362,7 +363,7 @@ class OrderBookerGCO:
|
|
|
r_key = apt.routing_key
|
|
|
|
|
|
queue_cd_key = f"vs:queue:cooldown:{r_key}"
|
|
|
- if self.redis_client.exists(queue_cd_key):
|
|
|
+ if self.redis_com.exists(queue_cd_key):
|
|
|
continue
|
|
|
|
|
|
with self.m_lock:
|
|
|
@@ -387,7 +388,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
try:
|
|
|
queue_name = f"auto.{target_routing_key}"
|
|
|
- task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name, test=False)
|
|
|
+ task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
|
|
|
if not task_data:
|
|
|
return
|
|
|
|
|
|
@@ -396,7 +397,7 @@ class OrderBookerGCO:
|
|
|
with self.m_lock:
|
|
|
self.m_task_data_cache[str(task_id)] = task_data
|
|
|
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + 5*60.0})
|
|
|
+ self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + 5*60.0})
|
|
|
user_inputs = task_data.get('user_inputs', {})
|
|
|
|
|
|
plg_cfg = VSPlgConfig()
|
|
|
@@ -410,7 +411,7 @@ class OrderBookerGCO:
|
|
|
|
|
|
acceptable_keys = [target_routing_key]
|
|
|
if self.m_cfg.need_proxy:
|
|
|
- proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False)
|
|
|
+ proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
|
|
|
plg_cfg.proxy.id = proxy['id']
|
|
|
plg_cfg.proxy.ip = proxy['ip']
|
|
|
plg_cfg.proxy.port = proxy['port']
|
|
|
@@ -437,7 +438,7 @@ class OrderBookerGCO:
|
|
|
)
|
|
|
)
|
|
|
queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
- self.redis_client.delete(queue_fail_key)
|
|
|
+ self.redis_com.delete(queue_fail_key)
|
|
|
success = True
|
|
|
self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
|
|
|
except Exception as e:
|
|
|
@@ -460,9 +461,9 @@ class OrderBookerGCO:
|
|
|
is_rate_limited = True
|
|
|
queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
|
|
|
- q_fails = self.redis_client.incr(queue_fail_key)
|
|
|
+ q_fails = self.redis_com.incr(queue_fail_key)
|
|
|
q_cd = self.queue_backoff.calculate(q_fails)
|
|
|
- self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
|
|
|
+ self.redis_com.set(queue_cd_key, "1", ex=int(q_cd))
|
|
|
self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
|
|
|
if task_id is not None:
|
|
|
task_meta = task_data.get('meta') or {}
|
|
|
@@ -476,13 +477,13 @@ class OrderBookerGCO:
|
|
|
|
|
|
t_cd = self.account_backoff.calculate(t_fails)
|
|
|
self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
+ self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
finally:
|
|
|
with self.m_lock:
|
|
|
self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
|
|
|
|
|
|
if not success and task_id is not None and not is_rate_limited:
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
|
|
|
+ self.redis_com.zadd(self.m_tracker_key, {str(task_id): 0})
|
|
|
self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
|
|
|
|
|
|
with self.m_lock:
|