slot_snapshot_service.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. # app/services/slot_snapshot_service.py
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from sqlalchemy import select
  4. from redis.asyncio import Redis
  5. from app.models.slot_snapshot import VasSlotSnapshot
  6. from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
  7. from app.services.notification_service import NotificationService
  8. class SlotSnapshotService:
  9. @staticmethod
  10. async def report(
  11. db: AsyncSession,
  12. redis_client: Redis,
  13. data: SlotSnapshotCreate
  14. ) -> VasSlotSnapshot:
  15. rec = VasSlotSnapshot(**data.dict())
  16. db.add(rec)
  17. await db.commit()
  18. await db.refresh(rec)
  19. # 修复点在这里:手动将 datetime 对象转为 string
  20. earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
  21. snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
  22. await NotificationService.post_wechat(
  23. redis_client=redis_client,
  24. template_id="slot_snapshot",
  25. payload={
  26. "country": rec.country,
  27. "city": rec.city,
  28. "visa_type": rec.visa_type,
  29. "routing_key": rec.routing_key,
  30. "availability_status": rec.availability_status,
  31. # 使用转换后的字符串
  32. "earliest_date": earliest_date_str,
  33. "availability": rec.availability,
  34. "snapshot_source": rec.snapshot_source,
  35. "snapshot_at": snapshot_at_str
  36. }
  37. )
  38. return rec
  39. @staticmethod
  40. async def latest_for(
  41. db: AsyncSession,
  42. country: str,
  43. city: str,
  44. visa_type: str
  45. ) -> VasSlotSnapshot:
  46. stmt = (
  47. select(VasSlotSnapshot)
  48. .where(
  49. VasSlotSnapshot.country == country,
  50. VasSlotSnapshot.city == city,
  51. VasSlotSnapshot.visa_type == visa_type,
  52. )
  53. .order_by(VasSlotSnapshot.snapshot_at.desc())
  54. .limit(1)
  55. )
  56. return await db.scalar(stmt)