slot_refresh_status_service.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from sqlalchemy.ext.asyncio import AsyncSession
  2. from sqlalchemy import select
  3. from redis.asyncio import Redis
  4. from datetime import datetime
  5. from typing import List
  6. from app.core.biz_exception import NotFoundError
  7. from app.models.slot_refresh_status import VasSlotRefreshStatus
  8. from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
  9. class SlotRefreshStatusService:
  10. @staticmethod
  11. async def refresh_start(
  12. db: AsyncSession,
  13. data: RefreshBase
  14. ) -> VasSlotRefreshStatus:
  15. now = datetime.utcnow()
  16. stmt = select(VasSlotRefreshStatus).where(
  17. VasSlotRefreshStatus.routing_key == data.routing_key
  18. )
  19. result = await db.execute(stmt)
  20. record = result.scalar_one_or_none()
  21. if record:
  22. record.last_refresh_at = now
  23. record.snapshot_source = data.snapshot_source
  24. record.last_error = None
  25. else:
  26. record = VasSlotRefreshStatus(
  27. routing_key=data.routing_key,
  28. snapshot_source=data.snapshot_source,
  29. country=data.country,
  30. city=data.city,
  31. visa_type=data.visa_type,
  32. last_refresh_at=now
  33. )
  34. db.add(record)
  35. await db.commit()
  36. return record
  37. @staticmethod
  38. async def refresh_success(
  39. db: AsyncSession,
  40. data: RefreshBase
  41. ) -> VasSlotRefreshStatus:
  42. stmt = select(VasSlotRefreshStatus).where(
  43. VasSlotRefreshStatus.routing_key == data.routing_key
  44. )
  45. result = await db.execute(stmt)
  46. record = result.scalar_one_or_none()
  47. if not record:
  48. raise NotFoundError(message="refresh record not found")
  49. now = datetime.utcnow()
  50. record.last_success_at = now
  51. record.last_error = None
  52. await db.commit()
  53. return record
  54. @staticmethod
  55. async def refresh_fail(
  56. db: AsyncSession,
  57. data: RefreshFail
  58. ) -> VasSlotRefreshStatus:
  59. stmt = select(VasSlotRefreshStatus).where(
  60. VasSlotRefreshStatus.routing_key == data.routing_key
  61. )
  62. result = await db.execute(stmt)
  63. record = result.scalar_one_or_none()
  64. if not record:
  65. raise NotFoundError(message="refresh record not found")
  66. record.last_error = data.error
  67. await db.commit()
  68. return record
  69. @staticmethod
  70. async def list_all(
  71. db: AsyncSession
  72. ) -> List[VasSlotRefreshStatus]:
  73. stmt = select(VasSlotRefreshStatus)
  74. result = await db.execute(stmt)
  75. return result.scalars().all()