slot_snapshot_service.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # app/services/slot_snapshot_service.py
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from sqlalchemy import select, func, desc, and_
  4. from redis.asyncio import Redis
  5. from typing import List, Dict, Any
  6. from datetime import datetime, timedelta
  7. from app.models.vas_task import VasTask
  8. from app.models.slot_snapshot import VasSlotSnapshot
  9. from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
  10. from app.services.notification_service import NotificationService
  11. from app.models.slot_snapshot import VasSlotSnapshot
  12. from app.models.slot_refresh_status import VasSlotRefreshStatus
  13. from app.utils.throttler import RedisThrottler, BusinessRateLimiter
  14. class SlotSnapshotService:
  15. @staticmethod
  16. async def report(
  17. db: AsyncSession,
  18. redis_client: Redis,
  19. data: SlotSnapshotCreate
  20. ) -> VasSlotSnapshot:
  21. rec = VasSlotSnapshot(**data.dict())
  22. db.add(rec)
  23. await db.commit()
  24. await db.refresh(rec)
  25. # 1. 准备序列化数据
  26. earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
  27. snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
  28. # 2. 使用通用 Redis 限流器
  29. # 只要 状态 或 日期 变了,Signature 就会变,从而触发推送
  30. throttle_key = f"throttle:slot_snapshot:{rec.routing_key}"
  31. signature = f"{rec.availability_status}|{earliest_date_str or ''}"
  32. is_throttled = await RedisThrottler.should_throttle(redis_client, throttle_key, signature, expire_seconds=1800)
  33. if not is_throttled:
  34. await NotificationService.post_message(
  35. db=db,
  36. channel="wechat",
  37. payload={
  38. "template_id": "slot_snapshot",
  39. "payload": {
  40. **data.dict(), # 简化写法
  41. "earliest_date": earliest_date_str,
  42. "snapshot_at": snapshot_at_str
  43. }
  44. },
  45. )
  46. # 3. 订阅通知
  47. await SlotSnapshotService._notify_subscribers(
  48. db=db,
  49. rec=rec,
  50. earliest_date_str=earliest_date_str
  51. )
  52. return rec
  53. @staticmethod
  54. async def _notify_subscribers(
  55. db: AsyncSession,
  56. rec: VasSlotSnapshot,
  57. earliest_date_str: str
  58. ) -> None:
  59. stmt = select(VasTask).where(VasTask.status == "pending", VasTask.routing_key == "sub.slot")
  60. tasks = (await db.execute(stmt)).scalars().all()
  61. email_receivers = []
  62. whatsapp_receivers = []
  63. # 业务逻辑 ID:优先用日期,没有日期用状态
  64. current_biz_id = earliest_date_str or rec.availability_status
  65. for task in tasks:
  66. user_inputs = task.user_inputs or {}
  67. if user_inputs.get("slot_routing_key") != rec.routing_key:
  68. continue
  69. # 使用通用业务限流器
  70. can_notify, updated_meta = BusinessRateLimiter.check_notification_limit(
  71. meta=dict(task.meta) if task.meta else {},
  72. current_id=current_biz_id,
  73. cooldown_hours=8,
  74. daily_max=3
  75. )
  76. if not can_notify:
  77. continue
  78. # 更新数据库状态
  79. task.meta = updated_meta
  80. task.notify_count = updated_meta["notify_count"]
  81. # 收集接收者
  82. message_payload = {
  83. "slot_routing_key": rec.routing_key,
  84. "country": rec.country,
  85. "city": rec.city,
  86. "visa_type": rec.visa_type,
  87. "earliest_date": earliest_date_str,
  88. "website": rec.website,
  89. "message": f"Slot update: {rec.country}-{rec.city} {rec.visa_type} {earliest_date_str or rec.availability_status}",
  90. }
  91. if email := user_inputs.get("email"):
  92. email_receivers.append(email)
  93. if phone := user_inputs.get("phone"):
  94. # 1. 确保国家代码不带 +
  95. code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
  96. # 2. 去掉手机号开头的 0 (使用 lstrip)
  97. clean_phone = str(phone).lstrip('0')
  98. # 3. 组合,不加 + 前缀
  99. whatsapp_receivers.append(f"{code}{clean_phone}")
  100. # 批量发送通知
  101. if email_receivers:
  102. await NotificationService.post_message(db=db, channel="email", payload={
  103. "template_id": "slot_subscription", "receivers": list(set(email_receivers)), "payload": message_payload
  104. })
  105. if whatsapp_receivers:
  106. await NotificationService.post_message(db=db, channel="whatsapp", payload={
  107. "template_id": "slot_subscription", "receivers": list(set(whatsapp_receivers)), "payload": message_payload
  108. })
  109. await db.commit()
  110. @staticmethod
  111. async def latest_for(
  112. db: AsyncSession,
  113. country: str,
  114. city: str,
  115. visa_type: str
  116. ) -> VasSlotSnapshot:
  117. stmt = (
  118. select(VasSlotSnapshot)
  119. .where(
  120. VasSlotSnapshot.country == country,
  121. VasSlotSnapshot.city == city,
  122. VasSlotSnapshot.visa_type == visa_type,
  123. )
  124. .order_by(VasSlotSnapshot.snapshot_at.desc())
  125. .limit(1)
  126. )
  127. return await db.scalar(stmt)
  128. async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
  129. """
  130. 异步获取指定城市的最新 Slot 快照 dashboard 数据
  131. 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
  132. """
  133. # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
  134. # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
  135. subquery = (
  136. select(func.max(VasSlotSnapshot.id).label("max_id"))
  137. .where(VasSlotSnapshot.city == city)
  138. .group_by(
  139. VasSlotSnapshot.country,
  140. VasSlotSnapshot.city,
  141. VasSlotSnapshot.visa_type
  142. )
  143. .subquery()
  144. )
  145. # 2. 主查询
  146. # Join 逻辑修改:
  147. # - Inner Join 子查询:确保只拿到最新的 snapshot
  148. # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
  149. # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
  150. stmt = (
  151. select(VasSlotSnapshot, VasSlotRefreshStatus)
  152. .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
  153. .outerjoin(
  154. VasSlotRefreshStatus,
  155. and_(
  156. VasSlotSnapshot.country == VasSlotRefreshStatus.country,
  157. VasSlotSnapshot.city == VasSlotRefreshStatus.city,
  158. VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
  159. )
  160. )
  161. .order_by(VasSlotSnapshot.country)
  162. )
  163. # 3. 执行查询
  164. result = await db.execute(stmt)
  165. rows = result.all()
  166. # 4. 组装数据
  167. dashboard_data = []
  168. for row in rows:
  169. snap: VasSlotSnapshot = row[0]
  170. status: VasSlotRefreshStatus = row[1]
  171. item = {
  172. "id": snap.id,
  173. "country": snap.country,
  174. "city": snap.city,
  175. "visa_type": snap.visa_type,
  176. "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
  177. "availability_status": snap.availability_status,
  178. "earliest_date": snap.earliest_date,
  179. "snapshot_at": snap.snapshot_at,
  180. "website": snap.website,
  181. # 优先从 status 表取心跳,如果关联不上,就为 null
  182. "last_check_at": status.last_success_at if status else None
  183. }
  184. if snap.availability:
  185. item["availability"] = snap.availability
  186. dashboard_data.append(item)
  187. return dashboard_data