thread_pool.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. # toolkit/thread_pool.py
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, Future
  4. from vs_log_macros import VSC_INFO, VSC_ERROR # type: ignore
  5. class ThreadPool:
  6. """
  7. @brief 线程池单例类
  8. 模拟C++的ThreadPool,使用Python的ThreadPoolExecutor。
  9. """
  10. _instance = None
  11. _lock = threading.Lock()
  12. def __new__(cls, max_workers: int = 10):
  13. with cls._lock:
  14. if cls._instance is None:
  15. cls._instance = super().__new__(cls)
  16. cls._instance._executor = ThreadPoolExecutor(max_workers=max_workers)
  17. VSC_INFO("thread_pool", "ThreadPool initialized with %d workers", max_workers)
  18. return cls._instance
  19. @staticmethod
  20. def getInstance(max_workers: int = 10):
  21. """获取线程池单例实例。"""
  22. return ThreadPool(max_workers)
  23. def enqueue(self, func, *args, **kwargs) -> Future:
  24. """
  25. @brief 提交任务到线程池。
  26. @param func 要执行的函数
  27. @param args 函数的位置参数
  28. @param kwargs 函数的关键字参数
  29. @return Future 对象,表示异步操作的结果
  30. """
  31. try:
  32. future = self._executor.submit(func, *args, **kwargs)
  33. return future
  34. except Exception as e:
  35. VSC_ERROR("thread_pool", "Failed to enqueue task: %s", str(e))
  36. raise
  37. def shutdown(self, wait: bool = True):
  38. """
  39. @brief 关闭线程池。
  40. @param wait 如果为True,则等待所有待处理的future完成。
  41. """
  42. if self._executor:
  43. VSC_INFO("thread_pool", "Shutting down ThreadPool (wait=%s)...", wait)
  44. self._executor.shutdown(wait=wait)
  45. self._executor = None