slot_snapshot_service.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # app/services/slot_snapshot_service.py
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from sqlalchemy import select, func, desc, and_
  4. from redis.asyncio import Redis
  5. from typing import List, Dict, Any
  6. from datetime import datetime, timedelta
  7. from app.models.vas_task import VasTask
  8. from app.models.slot_snapshot import VasSlotSnapshot
  9. from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
  10. from app.services.notification_service import NotificationService
  11. from app.models.slot_snapshot import VasSlotSnapshot
  12. from app.models.slot_refresh_status import VasSlotRefreshStatus
  13. from app.utils.throttler import RedisThrottler, BusinessRateLimiter
  14. class SlotSnapshotService:
  15. @staticmethod
  16. async def report(
  17. db: AsyncSession,
  18. redis_client: Redis,
  19. data: SlotSnapshotCreate
  20. ) -> VasSlotSnapshot:
  21. rec = VasSlotSnapshot(**data.dict())
  22. db.add(rec)
  23. await db.commit()
  24. await db.refresh(rec)
  25. # 1. 准备序列化数据 (用于 payload 展示)
  26. earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
  27. snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
  28. throttle_key = f"throttle:slot_snapshot:{rec.routing_key}"
  29. # 修复 2:针对 Signature,日期只精确到“天”(YYYY-MM-DD),防止时分秒的微小变动触发疯狂推送
  30. sig_date = ""
  31. if rec.earliest_date:
  32. # 如果 earliest_date 有 date() 方法说明它是 datetime,否则认为是 date 对象
  33. sig_date = rec.earliest_date.strftime("%Y-%m-%d") if hasattr(rec.earliest_date, "strftime") else str(rec.earliest_date)
  34. signature = f"{rec.availability_status}|{sig_date}"
  35. # 2. 使用通用 Redis 限流器
  36. # 这里逻辑:如果 signature 和 redis 里的旧 signature 完全一致,且在 1800 秒内,则 should_throttle 返回 True
  37. is_throttled = await RedisThrottler.should_throttle(
  38. redis_client,
  39. throttle_key,
  40. signature,
  41. expire_seconds=1800
  42. )
  43. # ==================== 修复结束 ====================
  44. if not is_throttled:
  45. await NotificationService.post_message(
  46. db=db,
  47. channel="wechat",
  48. payload={
  49. "template_id": "slot_snapshot",
  50. "payload": {
  51. **data.dict(),
  52. "earliest_date": earliest_date_str,
  53. "snapshot_at": snapshot_at_str
  54. }
  55. },
  56. )
  57. # 3. 订阅通知
  58. await SlotSnapshotService._notify_subscribers(
  59. db=db,
  60. rec=rec,
  61. earliest_date_str=earliest_date_str
  62. )
  63. return rec
  64. @staticmethod
  65. async def _notify_subscribers(
  66. db: AsyncSession,
  67. rec: VasSlotSnapshot,
  68. earliest_date_str: str
  69. ) -> None:
  70. stmt = select(VasTask).where(VasTask.status == "pending", VasTask.routing_key == "sub.slot")
  71. tasks = (await db.execute(stmt)).scalars().all()
  72. email_receivers = []
  73. whatsapp_receivers = []
  74. # 业务逻辑 ID:优先用日期,没有日期用状态
  75. current_biz_id = earliest_date_str or rec.availability_status
  76. for task in tasks:
  77. user_inputs = task.user_inputs or {}
  78. if user_inputs.get("slot_routing_key") != rec.routing_key:
  79. continue
  80. # 使用通用业务限流器
  81. can_notify, updated_meta = BusinessRateLimiter.check_notification_limit(
  82. meta=dict(task.meta) if task.meta else {},
  83. current_id=current_biz_id,
  84. cooldown_hours=8,
  85. daily_max=3
  86. )
  87. if not can_notify:
  88. continue
  89. # 更新数据库状态
  90. task.meta = updated_meta
  91. task.notify_count = updated_meta["notify_count"]
  92. # 收集接收者
  93. message_payload = {
  94. "slot_routing_key": rec.routing_key,
  95. "country": rec.country,
  96. "city": rec.city,
  97. "visa_type": rec.visa_type,
  98. "earliest_date": earliest_date_str,
  99. "website": rec.website,
  100. "message": f"Slot update: {rec.country}-{rec.city} {rec.visa_type} {earliest_date_str or rec.availability_status}",
  101. }
  102. if email := user_inputs.get("email"):
  103. email_receivers.append(email)
  104. if phone := user_inputs.get("phone"):
  105. # 1. 确保国家代码不带 +
  106. code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
  107. # 2. 去掉手机号开头的 0 (使用 lstrip)
  108. clean_phone = str(phone).lstrip('0')
  109. # 3. 组合,不加 + 前缀
  110. whatsapp_receivers.append(f"{code}{clean_phone}")
  111. # 批量发送通知
  112. if email_receivers:
  113. await NotificationService.post_message(db=db, channel="email", payload={
  114. "template_id": "slot_subscription", "receivers": list(set(email_receivers)), "payload": message_payload
  115. })
  116. if whatsapp_receivers:
  117. await NotificationService.post_message(db=db, channel="whatsapp", payload={
  118. "template_id": "slot_subscription", "receivers": list(set(whatsapp_receivers)), "payload": message_payload
  119. })
  120. await db.commit()
  121. @staticmethod
  122. async def latest_for(
  123. db: AsyncSession,
  124. country: str,
  125. city: str,
  126. visa_type: str
  127. ) -> VasSlotSnapshot:
  128. stmt = (
  129. select(VasSlotSnapshot)
  130. .where(
  131. VasSlotSnapshot.country == country,
  132. VasSlotSnapshot.city == city,
  133. VasSlotSnapshot.visa_type == visa_type,
  134. )
  135. .order_by(VasSlotSnapshot.snapshot_at.desc())
  136. .limit(1)
  137. )
  138. return await db.scalar(stmt)
  139. async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
  140. """
  141. 异步获取指定城市的最新 Slot 快照 dashboard 数据
  142. 修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
  143. """
  144. # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
  145. # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
  146. subquery = (
  147. select(func.max(VasSlotSnapshot.id).label("max_id"))
  148. .where(VasSlotSnapshot.city == city)
  149. .group_by(
  150. VasSlotSnapshot.country,
  151. VasSlotSnapshot.city,
  152. VasSlotSnapshot.visa_type
  153. )
  154. .subquery()
  155. )
  156. # 2. 主查询
  157. # Join 逻辑修改:
  158. # - Inner Join 子查询:确保只拿到最新的 snapshot
  159. # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
  160. # 而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
  161. stmt = (
  162. select(VasSlotSnapshot, VasSlotRefreshStatus)
  163. .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
  164. .outerjoin(
  165. VasSlotRefreshStatus,
  166. and_(
  167. VasSlotSnapshot.country == VasSlotRefreshStatus.country,
  168. VasSlotSnapshot.city == VasSlotRefreshStatus.city,
  169. VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
  170. )
  171. )
  172. .order_by(VasSlotSnapshot.country)
  173. )
  174. # 3. 执行查询
  175. result = await db.execute(stmt)
  176. rows = result.all()
  177. # 4. 组装数据
  178. dashboard_data = []
  179. for row in rows:
  180. snap: VasSlotSnapshot = row[0]
  181. status: VasSlotRefreshStatus = row[1]
  182. item = {
  183. "id": snap.id,
  184. "country": snap.country,
  185. "city": snap.city,
  186. "visa_type": snap.visa_type,
  187. "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
  188. "availability_status": snap.availability_status,
  189. "earliest_date": snap.earliest_date,
  190. "snapshot_at": snap.snapshot_at,
  191. "website": snap.website,
  192. # 优先从 status 表取心跳,如果关联不上,就为 null
  193. "last_check_at": status.last_success_at if status else None
  194. }
  195. if snap.availability:
  196. item["availability"] = snap.availability
  197. dashboard_data.append(item)
  198. return dashboard_data