backoff.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import random
  2. class ExponentialBackoff:
  3. """
  4. 通用指数退避计算工具
  5. """
  6. def __init__(self, base_delay: float, max_delay: float, factor: float = 2.0, jitter_range: tuple = (10.0, 45.0)):
  7. """
  8. :param base_delay: 基础退避时间(秒)
  9. :param max_delay: 最大退避时间(秒)
  10. :param factor: 指数乘数 (默认 2.0 即翻倍)
  11. :param jitter_range: 随机抖动范围(最小秒数,最大秒数)
  12. """
  13. self.base_delay = base_delay
  14. self.max_delay = max_delay
  15. self.factor = factor
  16. self.jitter_range = jitter_range
  17. def calculate(self, failure_count: int) -> float:
  18. """
  19. 根据连续失败次数计算下一次的冷却时间
  20. :param failure_count: 连续失败次数 (1开始)
  21. """
  22. if failure_count <= 0:
  23. return 0.0
  24. # 核心公式: 基础延迟 * (乘数 ^ (失败次数 - 1))
  25. multiplier = self.factor ** (failure_count - 1)
  26. delay = self.base_delay * multiplier
  27. # 限制上限
  28. delay = min(delay, self.max_delay)
  29. # 增加随机抖动
  30. if self.jitter_range:
  31. delay += random.uniform(self.jitter_range[0], self.jitter_range[1])
  32. return delay