priority_queue_utils.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import heapq
  2. import itertools
  3. REMOVED = object()
  4. class SimplePQ:
  5. def __init__(self):
  6. self.heap = []
  7. self.entries = {}
  8. self.counter = itertools.count()
  9. def size(self):
  10. return len(self.entries)
  11. def put(self, task_id, priority):
  12. """
  13. priority: 数值越大,优先级越高
  14. """
  15. if task_id in self.entries:
  16. self.remove(task_id)
  17. # 👇 关键:使用 -priority
  18. entry = [-priority, next(self.counter), task_id]
  19. self.entries[task_id] = entry
  20. heapq.heappush(self.heap, entry)
  21. def remove(self, task_id):
  22. entry = self.entries.pop(task_id, None)
  23. if entry:
  24. entry[2] = REMOVED
  25. def pop(self):
  26. """
  27. 弹出 priority 最大、且最早入队的 task_id
  28. """
  29. while self.heap:
  30. _, _, task_id = heapq.heappop(self.heap)
  31. if task_id is not REMOVED:
  32. self.entries.pop(task_id, None)
  33. return task_id
  34. return None
  35. def clear(self):
  36. self.heap.clear()
  37. self.entries.clear()
  38. def dump(self):
  39. """
  40. 以 pop 顺序返回队列内容(不破坏原队列)
  41. """
  42. valid = [
  43. (-priority, count, task_id)
  44. for priority, count, task_id in self.heap
  45. if task_id is not REMOVED
  46. ]
  47. # priority 大的排前面;同 priority 按 FIFO
  48. valid.sort(key=lambda x: (-x[0], x[1]))
  49. return [
  50. {
  51. "task_id": task_id,
  52. "priority": priority,
  53. "order": idx,
  54. }
  55. for idx, (priority, _, task_id) in enumerate(valid)
  56. ]
  57. class NamedQueueManager:
  58. def __init__(self):
  59. self.queues = {}
  60. self.initialized = set()
  61. def get_queue(self, name):
  62. if name not in self.queues:
  63. self.queues[name] = SimplePQ()
  64. return self.queues[name]
  65. def mark_initialized(self, name):
  66. self.initialized.add(name)
  67. def is_initialized(self, name) -> bool:
  68. return name in self.initialized
  69. def put(self, queue_name, task_id, priority):
  70. self.get_queue(queue_name).put(task_id, priority)
  71. def pop(self, queue_name):
  72. return self.get_queue(queue_name).pop()
  73. def remove(self, queue_name, task_id):
  74. q = self.queues.get(queue_name)
  75. if q:
  76. q.remove(task_id)
  77. def list_queues(self):
  78. return list(self.queues.keys())
  79. def dump_all(self):
  80. data = {}
  81. for name, queue in self.queues.items():
  82. data[name] = {
  83. "size": queue.size(),
  84. "tasks": queue.dump(),
  85. }
  86. return data