import heapq import itertools REMOVED = object() class SimplePQ: def __init__(self): self.heap = [] self.entries = {} self.counter = itertools.count() def size(self): return len(self.entries) def put(self, task_id, priority): """ priority: 数值越大,优先级越高 """ if task_id in self.entries: self.remove(task_id) # 👇 关键:使用 -priority entry = [-priority, next(self.counter), task_id] self.entries[task_id] = entry heapq.heappush(self.heap, entry) def remove(self, task_id): entry = self.entries.pop(task_id, None) if entry: entry[2] = REMOVED def pop(self): """ 弹出 priority 最大、且最早入队的 task_id """ while self.heap: _, _, task_id = heapq.heappop(self.heap) if task_id is not REMOVED: self.entries.pop(task_id, None) return task_id return None def clear(self): self.heap.clear() self.entries.clear() def dump(self): """ 以 pop 顺序返回队列内容(不破坏原队列) """ valid = [ (-priority, count, task_id) for priority, count, task_id in self.heap if task_id is not REMOVED ] # priority 大的排前面;同 priority 按 FIFO valid.sort(key=lambda x: (-x[0], x[1])) return [ { "task_id": task_id, "priority": priority, "order": idx, } for idx, (priority, _, task_id) in enumerate(valid) ] class NamedQueueManager: def __init__(self): self.queues = {} self.initialized = set() def get_queue(self, name): if name not in self.queues: self.queues[name] = SimplePQ() return self.queues[name] def mark_initialized(self, name): self.initialized.add(name) def is_initialized(self, name) -> bool: return name in self.initialized def put(self, queue_name, task_id, priority): self.get_queue(queue_name).put(task_id, priority) def pop(self, queue_name): return self.get_queue(queue_name).pop() def remove(self, queue_name, task_id): q = self.queues.get(queue_name) if q: q.remove(task_id) def list_queues(self): return list(self.queues.keys()) def dump_all(self): data = {} for name, queue in self.queues.items(): data[name] = { "size": queue.size(), "tasks": queue.dump(), } return data