jerry 3 mēneši atpakaļ
vecāks
revīzija
a2c6f82124

+ 25 - 1
app/api/router.py

@@ -46,6 +46,7 @@ from app.schemas.webhook import SMSHelperWebhookPayload
 from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate, VasTaskOut
 from app.schemas.ticket import VasTicketCreate, VasTicketOut, VasTicketStatusUpdate, VasTicketMessageCreate, VasTicketMessageOut
 from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
+from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
 from app.schemas.telegram import TelegramIn
 from app.schemas.wechat import WechatIn
 from app.schemas.resource import FileUploadOut
@@ -81,7 +82,7 @@ from app.services.wechat_service import WechatService
 from app.services.slot_snapshot_service import SlotSnapshotService
 from app.services.statistics_service import StatisticsService
 from app.services.llm_service import LlmService
-
+from app.services.slot_refresh_status_service import SlotRefreshStatusService
 
 # 公共路由
 public_router = APIRouter()
@@ -680,6 +681,29 @@ async def slots_report(
     res = await SlotSnapshotService.report(db, redis_client, payload)
     return success(data=res)
 
+@admin_required_router.post("/slot_refresh/start", summary="刷新slot开始", tags=["Slot Monitor 监控"], response_model=ApiResponse[RefreshStatusOut])
+async def slot_refresh_start(data: RefreshBase, db: AsyncSession = Depends(get_db)
+):
+    data = await SlotRefreshStatusService.refresh_start(db, data)
+    return success(data=data)
+
+@admin_required_router.post("/slot_refresh/success", summary="刷新slot成功", tags=["Slot Monitor 监控"], response_model=ApiResponse[RefreshStatusOut])
+async def  slot_refresh_success(data: RefreshBase, db: AsyncSession = Depends(get_db)
+):
+    data = await SlotRefreshStatusService.refresh_success(db, data)
+    return success(data=data)
+
+@admin_required_router.post("/slot_refresh/fail", summary="刷新slot失败", tags=["Slot Monitor 监控"], response_model=ApiResponse[RefreshStatusOut])
+async def slot_refresh_fail(data: RefreshFail,db: AsyncSession = Depends(get_db)
+):
+    data = await SlotRefreshStatusService.refresh_fail(db, data)
+    return success(data=data)
+
+@admin_required_router.get("/slot_refresh/status", summary="查询刷新纪录", tags=["Slot Monitor 监控"], response_model=ApiResponse[List[RefreshStatusOut]])
+async def slot_refresh_status(db: AsyncSession = Depends(get_db)):
+    data = await SlotRefreshStatusService.list_all(db)
+    return success(data=data)
+
 @public_router.post("/webhook/smshelper", summary="双开助手Webhook", tags=["webhook"], response_model=ApiResponse)
 async def webhook_smshelper(
     payload: SMSHelperWebhookPayload,

+ 31 - 0
app/models/slot_refresh_status.py

@@ -0,0 +1,31 @@
+from sqlalchemy import Column, Integer, String, DateTime, Enum, Text
+from datetime import datetime
+from app.core.database import Base
+
+
+class VasSlotRefreshStatus(Base):
+    __tablename__ = "vas_slot_refresh_status"
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+
+    routing_key = Column(String(255), nullable=False, unique=True)
+    snapshot_source = Column(
+        Enum('worker', 'manual', 'sync'),
+        nullable=False
+    )
+    
+    country = Column(String(100), nullable=False)
+    city = Column(String(100), nullable=False)
+    visa_type = Column(String(100), nullable=False)
+
+    last_refresh_at = Column(DateTime, nullable=False)
+    last_success_at = Column(DateTime)
+
+    last_error = Column(Text)
+
+    updated_at = Column(
+        DateTime,
+        nullable=False,
+        default=datetime.utcnow,
+        onupdate=datetime.utcnow
+    )

+ 36 - 0
app/schemas/slot_refresh_status.py

@@ -0,0 +1,36 @@
+from pydantic import BaseModel
+from datetime import datetime
+from typing import Optional
+
+
+class RefreshBase(BaseModel):
+    routing_key: str
+    country: Optional[str] = None
+    city: Optional[str] = None
+    visa_type: Optional[str] = None
+    snapshot_source: str
+
+class RefreshStart(BaseModel):
+    routing_key: str
+    country: str
+    city: str
+    visa_type: str
+    snapshot_source: str
+
+class RefreshFail(RefreshBase):
+    error: str
+
+class RefreshStatusOut(BaseModel):
+    routing_key: str
+    country: str
+    city: str
+    visa_type: str
+    snapshot_source: str
+    last_refresh_at: datetime
+    last_success_at: Optional[datetime]
+    last_error: Optional[str]
+    updated_at: datetime
+
+    model_config = {
+        "from_attributes": True
+    }

+ 92 - 0
app/services/slot_refresh_status_service.py

@@ -0,0 +1,92 @@
+
+
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from redis.asyncio import Redis
+from datetime import datetime
+from typing import List
+from app.core.biz_exception import NotFoundError
+from app.models.slot_refresh_status import VasSlotRefreshStatus
+from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
+
+
+class SlotRefreshStatusService:
+    
+    @staticmethod
+    async def refresh_start(
+        db: AsyncSession,
+        data: RefreshBase
+    ) -> VasSlotRefreshStatus:
+        now = datetime.utcnow()
+
+        stmt = select(VasSlotRefreshStatus).where(
+            VasSlotRefreshStatus.routing_key == data.routing_key
+        )
+        result = await db.execute(stmt)
+        record = result.scalar_one_or_none()
+
+        if record:
+            record.last_refresh_at = now
+            record.snapshot_source = data.snapshot_source
+            record.last_error = None
+        else:
+            record = VasSlotRefreshStatus(
+                routing_key=data.routing_key,
+                snapshot_source=data.snapshot_source,
+                country=data.country,
+                city=data.city,
+                visa_type=data.visa_type,
+                last_refresh_at=now
+            )
+            db.add(record)
+
+        await db.commit()
+        return record
+    
+    @staticmethod
+    async def refresh_success(
+        db: AsyncSession,
+        data: RefreshBase
+    ) -> VasSlotRefreshStatus:
+        stmt = select(VasSlotRefreshStatus).where(
+            VasSlotRefreshStatus.routing_key == data.routing_key
+        )
+        result = await db.execute(stmt)
+        record = result.scalar_one_or_none()
+
+        if not record:
+            raise NotFoundError(message="refresh record not found")
+
+        now = datetime.utcnow()
+        record.last_success_at = now
+        record.last_error = None
+
+        await db.commit()
+        return record
+    
+    @staticmethod
+    async def refresh_fail(
+        db: AsyncSession,
+        data: RefreshFail
+    ) -> VasSlotRefreshStatus:
+        stmt = select(VasSlotRefreshStatus).where(
+            VasSlotRefreshStatus.routing_key == data.routing_key
+        )
+        result = await db.execute(stmt)
+        record = result.scalar_one_or_none()
+
+        if not record:
+            raise NotFoundError(message="refresh record not found")
+
+        record.last_error = data.error
+
+        await db.commit()
+        return record
+
+    @staticmethod
+    async def list_all(
+        db: AsyncSession
+    ) -> List[VasSlotRefreshStatus]:
+        stmt = select(VasSlotRefreshStatus)
+        result = await db.execute(stmt)
+        return result.scalars().all()

+ 2 - 2
app/services/statistics_service.py

@@ -79,7 +79,7 @@ class StatisticsService:
         total_orders = (
             await db.scalar(
                 select(func.count(VasOrder.id))
-                .where(VasOrder.status != "closed")
+                .where(VasOrder.status == "paid")
             )
         ) or 0
 
@@ -132,7 +132,7 @@ class StatisticsService:
                 .where(
                     VasOrder.created_at >= start_dt,
                     VasOrder.created_at <= end_dt,
-                    VasOrder.status.in_(["paid", "completed", "succeeded"])
+                    VasOrder.status.in_(["paid", "completed"])
                 )
                 .group_by(VasOrder.base_currency)
             )