# app/services/slot_snapshot_service.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, desc, and_ from redis.asyncio import Redis from typing import List, Dict, Any from app.models.slot_snapshot import VasSlotSnapshot from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut from app.services.notification_service import NotificationService from app.models.slot_snapshot import VasSlotSnapshot from app.models.slot_refresh_status import VasSlotRefreshStatus class SlotSnapshotService: @staticmethod async def report( db: AsyncSession, redis_client: Redis, data: SlotSnapshotCreate ) -> VasSlotSnapshot: rec = VasSlotSnapshot(**data.dict()) db.add(rec) await db.commit() await db.refresh(rec) # 修复点在这里:手动将 datetime 对象转为 string earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None await NotificationService.post_wechat( redis_client=redis_client, template_id="slot_snapshot", payload={ "country": rec.country, "city": rec.city, "visa_type": rec.visa_type, "routing_key": rec.routing_key, "availability_status": rec.availability_status, # 使用转换后的字符串 "earliest_date": earliest_date_str, "availability": rec.availability, "snapshot_source": rec.snapshot_source, "snapshot_at": snapshot_at_str } ) return rec @staticmethod async def latest_for( db: AsyncSession, country: str, city: str, visa_type: str ) -> VasSlotSnapshot: stmt = ( select(VasSlotSnapshot) .where( VasSlotSnapshot.country == country, VasSlotSnapshot.city == city, VasSlotSnapshot.visa_type == visa_type, ) .order_by(VasSlotSnapshot.snapshot_at.desc()) .limit(1) ) return await db.scalar(stmt) async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]: """ 异步获取指定城市的最新 Slot 快照 dashboard 数据 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。 """ # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条 subquery = ( select(func.max(VasSlotSnapshot.id).label("max_id")) .where(VasSlotSnapshot.city == city) .group_by( VasSlotSnapshot.country, VasSlotSnapshot.city, VasSlotSnapshot.visa_type ) .subquery() ) # 2. 主查询 # Join 逻辑修改: # - Inner Join 子查询:确保只拿到最新的 snapshot # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null), # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。 stmt = ( select(VasSlotSnapshot, VasSlotRefreshStatus) .join(subquery, VasSlotSnapshot.id == subquery.c.max_id) .outerjoin( VasSlotRefreshStatus, and_( VasSlotSnapshot.country == VasSlotRefreshStatus.country, VasSlotSnapshot.city == VasSlotRefreshStatus.city, VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type ) ) .order_by(VasSlotSnapshot.country) ) # 3. 执行查询 result = await db.execute(stmt) rows = result.all() # 4. 组装数据 dashboard_data = [] for row in rows: snap: VasSlotSnapshot = row[0] status: VasSlotRefreshStatus = row[1] item = { "id": snap.id, "country": snap.country, "city": snap.city, "visa_type": snap.visa_type, "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用 "availability_status": snap.availability_status, "earliest_date": snap.earliest_date, "snapshot_at": snap.snapshot_at, "website": snap.website, # 优先从 status 表取心跳,如果关联不上,就为 null "last_check_at": status.last_success_at if status else None } if snap.availability: item["availability"] = snap.availability dashboard_data.append(item) return dashboard_data