throttler.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # app/utils/throttler.py
  2. from redis.asyncio import Redis
  3. from datetime import datetime, timedelta
  4. from typing import Optional, Any, Tuple
  5. class RedisThrottler:
  6. """基于 Redis 的变化感知限流器"""
  7. @staticmethod
  8. async def should_throttle(
  9. redis: Redis,
  10. key: str,
  11. current_signature: str,
  12. expire_seconds: int = 1800
  13. ) -> bool:
  14. """
  15. 如果 signature 没变且未过期,则建议限流 (Return True)
  16. 如果 signature 变了,则更新 Redis 并允许通过 (Return False)
  17. """
  18. try:
  19. last_val = await redis.get(key)
  20. if last_val and last_val == current_signature:
  21. return True
  22. # 记录新状态并设置过期时间
  23. await redis.set(key, current_signature, ex=expire_seconds)
  24. return False
  25. except Exception:
  26. # Redis 异常时,默认不限流(保证业务可用性)
  27. return False
  28. class BusinessRateLimiter:
  29. """基于业务元数据的复杂限流器(冷却时间、每日上限、状态变更检查)"""
  30. @staticmethod
  31. def check_notification_limit(
  32. meta: dict,
  33. current_id: str, # 例如 earliest_date 或 status
  34. cooldown_hours: int = 8,
  35. daily_max: int = 3
  36. ) -> Tuple[bool, dict]:
  37. """
  38. 返回: (是否允许发送, 更新后的 meta)
  39. """
  40. now = datetime.utcnow()
  41. today = now.date().isoformat()
  42. last_notify_at = meta.get("last_notify_at")
  43. last_id = meta.get("last_id")
  44. daily_count_date = meta.get("daily_count_date")
  45. daily_count = int(meta.get("daily_count") or 0)
  46. # 1. 内容变化检查: 如果 ID 没变,直接拦截
  47. if last_id == current_id:
  48. return False, meta
  49. # 2. 冷却时间检查
  50. if last_notify_at:
  51. last_dt = datetime.fromisoformat(last_notify_at)
  52. if (now - last_dt) < timedelta(hours=cooldown_hours):
  53. return False, meta
  54. # 3. 每日上限检查
  55. if daily_count_date == today and daily_count >= daily_max:
  56. return False, meta
  57. # 更新元数据
  58. new_meta = meta.copy()
  59. new_meta["last_notify_at"] = now.isoformat()
  60. new_meta["last_id"] = current_id
  61. new_meta["daily_count_date"] = today
  62. new_meta["daily_count"] = daily_count + 1 if daily_count_date == today else 1
  63. new_meta["notify_count"] = int(new_meta.get("notify_count") or 0) + 1
  64. return True, new_meta