slot_snapshot_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # app/services/slot_snapshot_service.py
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from sqlalchemy import select, func, desc, and_
  4. from redis.asyncio import Redis
  5. from typing import List, Dict, Any
  6. from app.models.slot_snapshot import VasSlotSnapshot
  7. from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
  8. from app.services.notification_service import NotificationService
  9. from app.models.slot_snapshot import VasSlotSnapshot
  10. from app.models.slot_refresh_status import VasSlotRefreshStatus
  11. class SlotSnapshotService:
  12. @staticmethod
  13. async def report(
  14. db: AsyncSession,
  15. redis_client: Redis,
  16. data: SlotSnapshotCreate
  17. ) -> VasSlotSnapshot:
  18. rec = VasSlotSnapshot(**data.dict())
  19. db.add(rec)
  20. await db.commit()
  21. await db.refresh(rec)
  22. # 修复点在这里:手动将 datetime 对象转为 string
  23. earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
  24. snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
  25. await NotificationService.post_wechat(
  26. redis_client=redis_client,
  27. template_id="slot_snapshot",
  28. payload={
  29. "country": rec.country,
  30. "city": rec.city,
  31. "visa_type": rec.visa_type,
  32. "routing_key": rec.routing_key,
  33. "availability_status": rec.availability_status,
  34. # 使用转换后的字符串
  35. "earliest_date": earliest_date_str,
  36. "availability": rec.availability,
  37. "snapshot_source": rec.snapshot_source,
  38. "snapshot_at": snapshot_at_str
  39. }
  40. )
  41. return rec
  42. @staticmethod
  43. async def latest_for(
  44. db: AsyncSession,
  45. country: str,
  46. city: str,
  47. visa_type: str
  48. ) -> VasSlotSnapshot:
  49. stmt = (
  50. select(VasSlotSnapshot)
  51. .where(
  52. VasSlotSnapshot.country == country,
  53. VasSlotSnapshot.city == city,
  54. VasSlotSnapshot.visa_type == visa_type,
  55. )
  56. .order_by(VasSlotSnapshot.snapshot_at.desc())
  57. .limit(1)
  58. )
  59. return await db.scalar(stmt)
  60. async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
  61. """
  62. 异步获取指定城市的最新 Slot 快照 dashboard 数据
  63. 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
  64. """
  65. # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
  66. # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
  67. subquery = (
  68. select(func.max(VasSlotSnapshot.id).label("max_id"))
  69. .where(VasSlotSnapshot.city == city)
  70. .group_by(
  71. VasSlotSnapshot.country,
  72. VasSlotSnapshot.city,
  73. VasSlotSnapshot.visa_type
  74. )
  75. .subquery()
  76. )
  77. # 2. 主查询
  78. # Join 逻辑修改:
  79. # - Inner Join 子查询:确保只拿到最新的 snapshot
  80. # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
  81. # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
  82. stmt = (
  83. select(VasSlotSnapshot, VasSlotRefreshStatus)
  84. .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
  85. .outerjoin(
  86. VasSlotRefreshStatus,
  87. and_(
  88. VasSlotSnapshot.country == VasSlotRefreshStatus.country,
  89. VasSlotSnapshot.city == VasSlotRefreshStatus.city,
  90. VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
  91. )
  92. )
  93. .order_by(VasSlotSnapshot.country)
  94. )
  95. # 3. 执行查询
  96. result = await db.execute(stmt)
  97. rows = result.all()
  98. # 4. 组装数据
  99. dashboard_data = []
  100. for row in rows:
  101. snap: VasSlotSnapshot = row[0]
  102. status: VasSlotRefreshStatus = row[1]
  103. item = {
  104. "id": snap.id,
  105. "country": snap.country,
  106. "city": snap.city,
  107. "visa_type": snap.visa_type,
  108. "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
  109. "availability_status": snap.availability_status,
  110. "earliest_date": snap.earliest_date,
  111. "snapshot_at": snap.snapshot_at,
  112. "website": snap.website,
  113. # 优先从 status 表取心跳,如果关联不上,就为 null
  114. "last_check_at": status.last_success_at if status else None
  115. }
  116. if snap.availability:
  117. item["availability"] = snap.availability
  118. dashboard_data.append(item)
  119. return dashboard_data