| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # app/services/slot_snapshot_service.py
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, func, desc, and_
- from redis.asyncio import Redis
- from typing import List, Dict, Any
- from app.models.slot_snapshot import VasSlotSnapshot
- from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
- from app.services.notification_service import NotificationService
- from app.models.slot_snapshot import VasSlotSnapshot
- from app.models.slot_refresh_status import VasSlotRefreshStatus
- class SlotSnapshotService:
- @staticmethod
- async def report(
- db: AsyncSession,
- redis_client: Redis,
- data: SlotSnapshotCreate
- ) -> VasSlotSnapshot:
- rec = VasSlotSnapshot(**data.dict())
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- # 修复点在这里:手动将 datetime 对象转为 string
- earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
- snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
- await NotificationService.post_wechat(
- redis_client=redis_client,
- template_id="slot_snapshot",
- payload={
- "country": rec.country,
- "city": rec.city,
- "visa_type": rec.visa_type,
- "routing_key": rec.routing_key,
- "availability_status": rec.availability_status,
- # 使用转换后的字符串
- "earliest_date": earliest_date_str,
- "availability": rec.availability,
- "snapshot_source": rec.snapshot_source,
- "snapshot_at": snapshot_at_str
- }
- )
- return rec
- @staticmethod
- async def latest_for(
- db: AsyncSession,
- country: str,
- city: str,
- visa_type: str
- ) -> VasSlotSnapshot:
- stmt = (
- select(VasSlotSnapshot)
- .where(
- VasSlotSnapshot.country == country,
- VasSlotSnapshot.city == city,
- VasSlotSnapshot.visa_type == visa_type,
- )
- .order_by(VasSlotSnapshot.snapshot_at.desc())
- .limit(1)
- )
- return await db.scalar(stmt)
-
- async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
- """
- 异步获取指定城市的最新 Slot 快照 dashboard 数据
- 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
- """
-
- # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
- # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
- subquery = (
- select(func.max(VasSlotSnapshot.id).label("max_id"))
- .where(VasSlotSnapshot.city == city)
- .group_by(
- VasSlotSnapshot.country,
- VasSlotSnapshot.city,
- VasSlotSnapshot.visa_type
- )
- .subquery()
- )
- # 2. 主查询
- # Join 逻辑修改:
- # - Inner Join 子查询:确保只拿到最新的 snapshot
- # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
- # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
- stmt = (
- select(VasSlotSnapshot, VasSlotRefreshStatus)
- .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
- .outerjoin(
- VasSlotRefreshStatus,
- and_(
- VasSlotSnapshot.country == VasSlotRefreshStatus.country,
- VasSlotSnapshot.city == VasSlotRefreshStatus.city,
- VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
- )
- )
- .order_by(VasSlotSnapshot.country)
- )
- # 3. 执行查询
- result = await db.execute(stmt)
- rows = result.all()
- # 4. 组装数据
- dashboard_data = []
-
- for row in rows:
- snap: VasSlotSnapshot = row[0]
- status: VasSlotRefreshStatus = row[1]
- item = {
- "id": snap.id,
- "country": snap.country,
- "city": snap.city,
- "visa_type": snap.visa_type,
- "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
- "availability_status": snap.availability_status,
- "earliest_date": snap.earliest_date,
- "snapshot_at": snap.snapshot_at,
- "website": snap.website,
- # 优先从 status 表取心跳,如果关联不上,就为 null
- "last_check_at": status.last_success_at if status else None
- }
-
- if snap.availability:
- item["availability"] = snap.availability
- dashboard_data.append(item)
- return dashboard_data
|