| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- # app/services/slot_snapshot_service.py
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, func, desc, and_
- from redis.asyncio import Redis
- from typing import List, Dict, Any
- from datetime import datetime, timedelta
- from app.models.vas_task import VasTask
- from app.models.slot_snapshot import VasSlotSnapshot
- from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
- from app.services.notification_service import NotificationService
- from app.models.slot_snapshot import VasSlotSnapshot
- from app.models.slot_refresh_status import VasSlotRefreshStatus
- from app.utils.throttler import RedisThrottler, BusinessRateLimiter
- class SlotSnapshotService:
- @staticmethod
- async def report(
- db: AsyncSession,
- redis_client: Redis,
- data: SlotSnapshotCreate
- ) -> VasSlotSnapshot:
- rec = VasSlotSnapshot(**data.dict())
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- # 1. 准备序列化数据 (用于 payload 展示)
- earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
- snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
- throttle_key = f"throttle:slot_snapshot:{rec.routing_key}"
-
- # 修复 2:针对 Signature,日期只精确到“天”(YYYY-MM-DD),防止时分秒的微小变动触发疯狂推送
- sig_date = ""
- if rec.earliest_date:
- # 如果 earliest_date 有 date() 方法说明它是 datetime,否则认为是 date 对象
- sig_date = rec.earliest_date.strftime("%Y-%m-%d") if hasattr(rec.earliest_date, "strftime") else str(rec.earliest_date)
-
- signature = f"{rec.availability_status}|{sig_date}"
- # 2. 使用通用 Redis 限流器
- # 这里逻辑:如果 signature 和 redis 里的旧 signature 完全一致,且在 1800 秒内,则 should_throttle 返回 True
- is_throttled = await RedisThrottler.should_throttle(
- redis_client,
- throttle_key,
- signature,
- expire_seconds=1800
- )
- # ==================== 修复结束 ====================
- if not is_throttled:
- await NotificationService.post_message(
- db=db,
- channel="wechat",
- payload={
- "template_id": "slot_snapshot",
- "payload": {
- **data.dict(),
- "earliest_date": earliest_date_str,
- "snapshot_at": snapshot_at_str
- }
- },
- )
- # 3. 订阅通知
- await SlotSnapshotService._notify_subscribers(
- db=db,
- rec=rec,
- earliest_date_str=earliest_date_str
- )
- return rec
- @staticmethod
- async def _notify_subscribers(
- db: AsyncSession,
- rec: VasSlotSnapshot,
- earliest_date_str: str
- ) -> None:
- stmt = select(VasTask).where(VasTask.status == "pending", VasTask.routing_key == "sub.slot")
- tasks = (await db.execute(stmt)).scalars().all()
- email_receivers = []
- whatsapp_receivers = []
- # 业务逻辑 ID:优先用日期,没有日期用状态
- current_biz_id = earliest_date_str or rec.availability_status
- for task in tasks:
- user_inputs = task.user_inputs or {}
- if user_inputs.get("slot_routing_key") != rec.routing_key:
- continue
- # 使用通用业务限流器
- can_notify, updated_meta = BusinessRateLimiter.check_notification_limit(
- meta=dict(task.meta) if task.meta else {},
- current_id=current_biz_id,
- cooldown_hours=8,
- daily_max=3
- )
- if not can_notify:
- continue
- # 更新数据库状态
- task.meta = updated_meta
- task.notify_count = updated_meta["notify_count"]
- # 收集接收者
- message_payload = {
- "slot_routing_key": rec.routing_key,
- "country": rec.country,
- "city": rec.city,
- "visa_type": rec.visa_type,
- "earliest_date": earliest_date_str,
- "website": rec.website,
- "message": f"Slot update: {rec.country}-{rec.city} {rec.visa_type} {earliest_date_str or rec.availability_status}",
- }
- if email := user_inputs.get("email"):
- email_receivers.append(email)
- if phone := user_inputs.get("phone"):
- # 1. 确保国家代码不带 +
- code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
- # 2. 去掉手机号开头的 0 (使用 lstrip)
- clean_phone = str(phone).lstrip('0')
- # 3. 组合,不加 + 前缀
- whatsapp_receivers.append(f"{code}{clean_phone}")
- # 批量发送通知
- if email_receivers:
- await NotificationService.post_message(db=db, channel="email", payload={
- "template_id": "slot_subscription", "receivers": list(set(email_receivers)), "payload": message_payload
- })
- if whatsapp_receivers:
- await NotificationService.post_message(db=db, channel="whatsapp", payload={
- "template_id": "slot_subscription", "receivers": list(set(whatsapp_receivers)), "payload": message_payload
- })
- await db.commit()
-
- @staticmethod
- async def latest_for(
- db: AsyncSession,
- country: str,
- city: str,
- visa_type: str
- ) -> VasSlotSnapshot:
- stmt = (
- select(VasSlotSnapshot)
- .where(
- VasSlotSnapshot.country == country,
- VasSlotSnapshot.city == city,
- VasSlotSnapshot.visa_type == visa_type,
- )
- .order_by(VasSlotSnapshot.snapshot_at.desc())
- .limit(1)
- )
- return await db.scalar(stmt)
- async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
- """
- 异步获取指定城市的最新 Slot 快照 dashboard 数据
- 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
- """
-
- # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
- # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
- subquery = (
- select(func.max(VasSlotSnapshot.id).label("max_id"))
- .where(VasSlotSnapshot.city == city)
- .group_by(
- VasSlotSnapshot.country,
- VasSlotSnapshot.city,
- VasSlotSnapshot.visa_type
- )
- .subquery()
- )
- # 2. 主查询
- # Join 逻辑修改:
- # - Inner Join 子查询:确保只拿到最新的 snapshot
- # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
- # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
- stmt = (
- select(VasSlotSnapshot, VasSlotRefreshStatus)
- .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
- .outerjoin(
- VasSlotRefreshStatus,
- and_(
- VasSlotSnapshot.country == VasSlotRefreshStatus.country,
- VasSlotSnapshot.city == VasSlotRefreshStatus.city,
- VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
- )
- )
- .order_by(VasSlotSnapshot.country)
- )
- # 3. 执行查询
- result = await db.execute(stmt)
- rows = result.all()
- # 4. 组装数据
- dashboard_data = []
-
- for row in rows:
- snap: VasSlotSnapshot = row[0]
- status: VasSlotRefreshStatus = row[1]
- item = {
- "id": snap.id,
- "country": snap.country,
- "city": snap.city,
- "visa_type": snap.visa_type,
- "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
- "availability_status": snap.availability_status,
- "earliest_date": snap.earliest_date,
- "snapshot_at": snap.snapshot_at,
- "website": snap.website,
- # 优先从 status 表取心跳,如果关联不上,就为 null
- "last_check_at": status.last_success_at if status else None
- }
-
- if snap.availability:
- item["availability"] = snap.availability
- dashboard_data.append(item)
- return dashboard_data
|