| 1234567891011121314151617181920212223242526272829303132333435363738 |
- import random
- class ExponentialBackoff:
- """
- 通用指数退避计算工具
- """
- def __init__(self, base_delay: float, max_delay: float, factor: float = 2.0, jitter_range: tuple = (10.0, 45.0)):
- """
- :param base_delay: 基础退避时间(秒)
- :param max_delay: 最大退避时间(秒)
- :param factor: 指数乘数 (默认 2.0 即翻倍)
- :param jitter_range: 随机抖动范围(最小秒数,最大秒数)
- """
- self.base_delay = base_delay
- self.max_delay = max_delay
- self.factor = factor
- self.jitter_range = jitter_range
- def calculate(self, failure_count: int) -> float:
- """
- 根据连续失败次数计算下一次的冷却时间
- :param failure_count: 连续失败次数 (1开始)
- """
- if failure_count <= 0:
- return 0.0
-
- # 核心公式: 基础延迟 * (乘数 ^ (失败次数 - 1))
- multiplier = self.factor ** (failure_count - 1)
- delay = self.base_delay * multiplier
-
- # 限制上限
- delay = min(delay, self.max_delay)
-
- # 增加随机抖动
- if self.jitter_range:
- delay += random.uniform(self.jitter_range[0], self.jitter_range[1])
-
- return delay
|