# app/services/slot_snapshot_service.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, desc, and_ from redis.asyncio import Redis from typing import List, Dict, Any from datetime import datetime, timedelta from app.models.vas_task import VasTask from app.models.slot_snapshot import VasSlotSnapshot from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut from app.services.notification_service import NotificationService from app.models.slot_snapshot import VasSlotSnapshot from app.models.slot_refresh_status import VasSlotRefreshStatus from app.utils.throttler import RedisThrottler, BusinessRateLimiter class SlotSnapshotService: @staticmethod async def report( db: AsyncSession, redis_client: Redis, data: SlotSnapshotCreate ) -> VasSlotSnapshot: rec = VasSlotSnapshot(**data.dict()) db.add(rec) await db.commit() await db.refresh(rec) # 1. 准备序列化数据 (用于 payload 展示) earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None throttle_key = f"throttle:slot_snapshot:{rec.routing_key}" # 修复 2:针对 Signature,日期只精确到“天”(YYYY-MM-DD),防止时分秒的微小变动触发疯狂推送 sig_date = "" if rec.earliest_date: # 如果 earliest_date 有 date() 方法说明它是 datetime,否则认为是 date 对象 sig_date = rec.earliest_date.strftime("%Y-%m-%d") if hasattr(rec.earliest_date, "strftime") else str(rec.earliest_date) signature = f"{rec.availability_status}|{sig_date}" # 2. 使用通用 Redis 限流器 # 这里逻辑:如果 signature 和 redis 里的旧 signature 完全一致,且在 1800 秒内,则 should_throttle 返回 True is_throttled = await RedisThrottler.should_throttle( redis_client, throttle_key, signature, expire_seconds=2*60*60 ) # ==================== 修复结束 ==================== if not is_throttled: await NotificationService.post_message( db=db, channel="wechat", payload={ "template_id": "slot_snapshot", "payload": { **data.dict(), "earliest_date": earliest_date_str, "snapshot_at": snapshot_at_str } }, ) # 3. 订阅通知 await SlotSnapshotService._notify_subscribers( db=db, rec=rec, earliest_date_str=earliest_date_str ) return rec @staticmethod async def _notify_subscribers( db: AsyncSession, rec: VasSlotSnapshot, earliest_date_str: str ) -> None: stmt = select(VasTask).where(VasTask.status == "pending", VasTask.routing_key == "sub.slot") tasks = (await db.execute(stmt)).scalars().all() email_receivers = [] whatsapp_receivers = [] # 业务逻辑 ID:优先用日期,没有日期用状态 current_biz_id = earliest_date_str or rec.availability_status for task in tasks: user_inputs = task.user_inputs or {} if user_inputs.get("slot_routing_key") != rec.routing_key: continue # 使用通用业务限流器 can_notify, updated_meta = BusinessRateLimiter.check_notification_limit( meta=dict(task.meta) if task.meta else {}, current_id=current_biz_id, cooldown_hours=8, daily_max=3 ) if not can_notify: continue # 更新数据库状态 task.meta = updated_meta task.notify_count = updated_meta["notify_count"] # 收集接收者 message_payload = { "slot_routing_key": rec.routing_key, "country": rec.country, "city": rec.city, "visa_type": rec.visa_type, "earliest_date": earliest_date_str, "website": rec.website, "message": f"Slot update: {rec.country}-{rec.city} {rec.visa_type} {earliest_date_str or rec.availability_status}", } if email := user_inputs.get("email"): email_receivers.append(email) if phone := user_inputs.get("phone"): # 1. 确保国家代码不带 + code = str(user_inputs.get("phone_country_code") or "").replace("+", "") # 2. 去掉手机号开头的 0 (使用 lstrip) clean_phone = str(phone).lstrip('0') # 3. 组合,不加 + 前缀 whatsapp_receivers.append(f"{code}{clean_phone}") # 批量发送通知 if email_receivers: await NotificationService.post_message(db=db, channel="email", payload={ "template_id": "slot_subscription", "receivers": list(set(email_receivers)), "payload": message_payload }) if whatsapp_receivers: await NotificationService.post_message(db=db, channel="whatsapp", payload={ "template_id": "slot_subscription", "receivers": list(set(whatsapp_receivers)), "payload": message_payload }) await db.commit() @staticmethod async def latest_for( db: AsyncSession, country: str, city: str, visa_type: str ) -> VasSlotSnapshot: stmt = ( select(VasSlotSnapshot) .where( VasSlotSnapshot.country == country, VasSlotSnapshot.city == city, VasSlotSnapshot.visa_type == visa_type, ) .order_by(VasSlotSnapshot.snapshot_at.desc()) .limit(1) ) return await db.scalar(stmt) async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]: """ 异步获取指定城市的最新 Slot 快照 dashboard 数据 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。 """ # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条 subquery = ( select(func.max(VasSlotSnapshot.id).label("max_id")) .where(VasSlotSnapshot.city == city) .group_by( VasSlotSnapshot.country, VasSlotSnapshot.city, VasSlotSnapshot.visa_type ) .subquery() ) # 2. 主查询 # Join 逻辑修改: # - Inner Join 子查询:确保只拿到最新的 snapshot # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null), # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。 stmt = ( select(VasSlotSnapshot, VasSlotRefreshStatus) .join(subquery, VasSlotSnapshot.id == subquery.c.max_id) .outerjoin( VasSlotRefreshStatus, and_( VasSlotSnapshot.country == VasSlotRefreshStatus.country, VasSlotSnapshot.city == VasSlotRefreshStatus.city, VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type ) ) .order_by(VasSlotSnapshot.country) ) # 3. 执行查询 result = await db.execute(stmt) rows = result.all() # 4. 组装数据 dashboard_data = [] for row in rows: snap: VasSlotSnapshot = row[0] status: VasSlotRefreshStatus = row[1] item = { "id": snap.id, "country": snap.country, "city": snap.city, "visa_type": snap.visa_type, "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用 "availability_status": snap.availability_status, "earliest_date": snap.earliest_date, "snapshot_at": snap.snapshot_at, "website": snap.website, # 优先从 status 表取心跳,如果关联不上,就为 null "last_check_at": status.last_success_at if status else None } if snap.availability: item["availability"] = snap.availability dashboard_data.append(item) return dashboard_data