| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import heapq
- import itertools
- REMOVED = object()
- class SimplePQ:
- def __init__(self):
- self.heap = []
- self.entries = {}
- self.counter = itertools.count()
- def size(self):
- return len(self.entries)
- def put(self, task_id, priority):
- """
- priority: 数值越大,优先级越高
- """
- if task_id in self.entries:
- self.remove(task_id)
- # 👇 关键:使用 -priority
- entry = [-priority, next(self.counter), task_id]
- self.entries[task_id] = entry
- heapq.heappush(self.heap, entry)
- def remove(self, task_id):
- entry = self.entries.pop(task_id, None)
- if entry:
- entry[2] = REMOVED
- def pop(self):
- """
- 弹出 priority 最大、且最早入队的 task_id
- """
- while self.heap:
- _, _, task_id = heapq.heappop(self.heap)
- if task_id is not REMOVED:
- self.entries.pop(task_id, None)
- return task_id
- return None
-
- def clear(self):
- self.heap.clear()
- self.entries.clear()
- def dump(self):
- """
- 以 pop 顺序返回队列内容(不破坏原队列)
- """
- valid = [
- (-priority, count, task_id)
- for priority, count, task_id in self.heap
- if task_id is not REMOVED
- ]
- # priority 大的排前面;同 priority 按 FIFO
- valid.sort(key=lambda x: (-x[0], x[1]))
- return [
- {
- "task_id": task_id,
- "priority": priority,
- "order": idx,
- }
- for idx, (priority, _, task_id) in enumerate(valid)
- ]
- class NamedQueueManager:
- def __init__(self):
- self.queues = {}
- self.initialized = set()
- def get_queue(self, name):
- if name not in self.queues:
- self.queues[name] = SimplePQ()
- return self.queues[name]
- def mark_initialized(self, name):
- self.initialized.add(name)
- def is_initialized(self, name) -> bool:
- return name in self.initialized
- def put(self, queue_name, task_id, priority):
- self.get_queue(queue_name).put(task_id, priority)
- def pop(self, queue_name):
- return self.get_queue(queue_name).pop()
- def remove(self, queue_name, task_id):
- q = self.queues.get(queue_name)
- if q:
- q.remove(task_id)
- def list_queues(self):
- return list(self.queues.keys())
- def dump_all(self):
- data = {}
- for name, queue in self.queues.items():
- data[name] = {
- "size": queue.size(),
- "tasks": queue.dump(),
- }
- return data
|