# app/utils/throttler.py from redis.asyncio import Redis from datetime import datetime, timedelta from typing import Optional, Any, Tuple class RedisThrottler: """基于 Redis 的变化感知限流器""" @staticmethod async def should_throttle( redis: Redis, key: str, current_signature: str, expire_seconds: int = 1800 ) -> bool: """ 如果 signature 没变且未过期,则建议限流 (Return True) 如果 signature 变了,则更新 Redis 并允许通过 (Return False) """ try: last_val = await redis.get(key) if last_val and last_val == current_signature: return True # 记录新状态并设置过期时间 await redis.set(key, current_signature, ex=expire_seconds) return False except Exception: # Redis 异常时,默认不限流(保证业务可用性) return False class BusinessRateLimiter: """基于业务元数据的复杂限流器(冷却时间、每日上限、状态变更检查)""" @staticmethod def check_notification_limit( meta: dict, current_id: str, # 例如 earliest_date 或 status cooldown_hours: int = 8, daily_max: int = 3 ) -> Tuple[bool, dict]: """ 返回: (是否允许发送, 更新后的 meta) """ now = datetime.utcnow() today = now.date().isoformat() last_notify_at = meta.get("last_notify_at") last_id = meta.get("last_id") daily_count_date = meta.get("daily_count_date") daily_count = int(meta.get("daily_count") or 0) # 1. 内容变化检查: 如果 ID 没变,直接拦截 if last_id == current_id: return False, meta # 2. 冷却时间检查 if last_notify_at: last_dt = datetime.fromisoformat(last_notify_at) if (now - last_dt) < timedelta(hours=cooldown_hours): return False, meta # 3. 每日上限检查 if daily_count_date == today and daily_count >= daily_max: return False, meta # 更新元数据 new_meta = meta.copy() new_meta["last_notify_at"] = now.isoformat() new_meta["last_id"] = current_id new_meta["daily_count_date"] = today new_meta["daily_count"] = daily_count + 1 if daily_count_date == today else 1 new_meta["notify_count"] = int(new_meta.get("notify_count") or 0) + 1 return True, new_meta