jerry 3 månader sedan
förälder
incheckning
1b272991d9

+ 11 - 2
app/api/router.py

@@ -43,7 +43,7 @@ from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPa
 from app.schemas.payment_qr import VasPaymentQrCreate, VasPaymentQrSetEnableIn, VasPaymentQrOut
 from app.schemas.payment_provider import VasPaymentProviderCreate, VasPaymentProviderUpdate, VasPaymentProviderOut
 from app.schemas.webhook import SMSHelperWebhookPayload
-from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate, VasTaskOut
+from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate, VasExpiringTaskItem, VasTaskOut
 from app.schemas.ticket import VasTicketCreate, VasTicketOut, VasTicketStatusUpdate, VasTicketMessageCreate, VasTicketMessageOut
 from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut, SlotOverviewOut
 from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
@@ -522,7 +522,7 @@ async def email_authorizations_forward_email2(
 
 @admin_required_router.post("/email-authorizations/sendmail", summary="发送邮件", tags=["邮箱接口"], response_model=ApiResponse[EmailContent])
 async def email_authorizations_send_email(
-    emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"),
+    emailAccount: str = Query(..., description="发件账号, 格式: xxx@xxx.xxx"),
     sendTo: str = Query(..., description="收件人邮箱账号"),
     subject: str = Query(..., description="邮件主题"),
     contentType: str = Query("text", description="内容格式,支持 text 和 html"),
@@ -1222,6 +1222,15 @@ async def vas_task_list(
     tasks = await VasTaskService.list_task(db, status, routing_key, script_version, keyword, page, size)
     return success(data=tasks)
 
+@admin_required_router.get("/vas/task/expiring", summary="获取即将过期的任务列表", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasExpiringTaskItem]])
+async def vas_task_get_expiring_tasks(
+    days: int = 3,
+    db: AsyncSession = Depends(get_db)
+):
+    """获取即将过期的任务列表"""
+    tasks = await VasTaskService.get_expiring_tasks(db, threshold_days=days)
+    return success(data=tasks)
+
 @admin_required_router.post("/vas/task/update", summary="更新任务数据", tags=["Visafly签证系统"], response_model=ApiResponse[VasTaskOut])
 async def vas_task_update(
     id: int,

+ 6 - 1
app/schemas/statistics.py

@@ -25,6 +25,10 @@ class RevenueTrendItem(BaseModel):
 class ProductDistItem(BaseModel):
     name: str = Field(..., description="商品名称")
     value: int = Field(..., description="销量")
+    
+class ActiveTaskGroupItem(BaseModel):
+    routing_key: str = Field(..., description="任务队列/Routing Key")
+    count: int = Field(..., description="活跃任务数量")
 
 # 4. 最新动态
 class ActivityItem(BaseModel):
@@ -38,4 +42,5 @@ class VasStatisticsOverviewOut(BaseModel):
     stats: StatsData
     revenue_trend: List[RevenueTrendItem]
     product_dist: List[ProductDistItem]
-    recent_activities: List[ActivityItem]
+    recent_activities: List[ActivityItem]
+    active_tasks_dist: List[ActiveTaskGroupItem]

+ 11 - 0
app/schemas/vas_task.py

@@ -48,3 +48,14 @@ class VasTaskOut(VasTaskBase):
     model_config = {
         "from_attributes": True
     }
+    
+class VasExpiringTaskItem(BaseModel):
+    id: int
+    order_id: str
+    routing_key: str
+    status: str
+    social_media_account: str 
+    customer_name: str  # 拼装 first_name + last_name
+    expected_end_date: str
+    email: str
+    days_left: int      # 剩余天数 (负数代表已过期)

+ 22 - 1
app/services/statistics_service.py

@@ -312,6 +312,26 @@ class StatisticsService:
                 act["time"] = f"{diff.seconds // 60}分钟前"
             else:
                 act["time"] = "刚刚"
+                
+        active_task_stmt = (
+            select(
+                VasTask.routing_key, 
+                func.count(VasTask.id).label("count")
+            )
+            .where(
+                VasTask.status.in_(["pending", "grabbed", "running"])
+            )
+            .group_by(VasTask.routing_key)
+            .order_by(desc("count")) # 按数量倒序排列
+        )
+
+        active_task_rows = (await db.execute(active_task_stmt)).all()
+        
+        # 组装为 List[ActiveTaskGroupItem] 格式
+        active_tasks_dist_data = [
+            {"routing_key": r_key, "count": count} 
+            for r_key, count in active_task_rows
+        ]
 
         # --------------------------------------------------
         # 7. 组装返回结果
@@ -331,5 +351,6 @@ class StatisticsService:
             },
             "revenue_trend": revenue_trend_chart,       # 图表数据
             "product_dist": product_dist,               # 饼图数据
-            "recent_activities": activities             # 动态列表
+            "recent_activities": activities,            # 动态列表
+            "active_tasks_dist": active_tasks_dist_data
         }

+ 58 - 1
app/services/vas_task_service.py

@@ -1,6 +1,6 @@
 # app/services/task_service.py
 
-from datetime import datetime, timedelta
+from datetime import datetime, date, timedelta
 from typing import List, Optional
 
 from sqlalchemy.ext.asyncio import AsyncSession
@@ -87,6 +87,63 @@ class VasTaskService:
         except Exception as e:
             # 记录日志
             raise e
+        
+    @staticmethod
+    async def get_expiring_tasks(db: AsyncSession, threshold_days: int = 3):
+        """
+        获取即将过期或已过期的活跃任务
+        :param threshold_days: 预警阈值,默认 7 天内到期
+        """
+        # 1. 查出所有活跃任务
+        stmt = select(VasTask).where(
+            VasTask.status.in_(['pending', 'running', 'grabbed'])
+        )
+        tasks = (await db.execute(stmt)).scalars().all()
+        
+        results = []
+        today = date.today()
+
+        for task in tasks:
+            user_inputs = task.user_inputs or {}
+            end_date_str = user_inputs.get("expected_end_date")
+            
+            # 如果没有截止日期,跳过
+            if not end_date_str:
+                continue
+
+            try:
+                # 解析日期
+                end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
+                
+                # 计算剩余天数
+                delta = (end_date - today).days
+                
+                # 筛选条件: 已过期 (delta < 0) 或 即将过期 (delta <= threshold)
+                if delta <= threshold_days:
+                    # 获取客户姓名
+                    first = user_inputs.get("first_name", "")
+                    last = user_inputs.get("last_name", "")
+                    email = user_inputs.get("email", "")
+                    social_media_account = user_inputs.get("social_media_account", "")
+                    full_name = f"{first} {last}".strip() or "未知客户"
+
+                    results.append({
+                        "id": task.id,
+                        "order_id": task.order_id,
+                        "routing_key": task.routing_key,
+                        "status": task.status,
+                        "social_media_account": social_media_account,
+                        "customer_name": full_name,
+                        "expected_end_date": end_date_str,
+                        "email": email,
+                        "days_left": delta
+                    })
+            except ValueError:
+                continue # 日期格式错误则忽略
+
+        # 按剩余天数升序排列 (最急的在前面)
+        results.sort(key=lambda x: x["days_left"])
+        return results
 
     @staticmethod
     async def list_task(

+ 11 - 6
app/services/webhook_service.py

@@ -99,6 +99,7 @@ class WebhookService:
         amount_yuan = Decimal(amount_match.group(1))
         amount_cent = int(amount_yuan * 100)
 
+        # 修正点:实例化 Event 时字段名需匹配模型
         event = VasPaymentEvent(
             provider=provider,
             event_type="payment_received",
@@ -107,7 +108,7 @@ class WebhookService:
             parsed_amount=amount_cent,
             parsed_currency="CNY",
             parsed_device=device_id,
-            raw_payload=payload.dict(),
+            raw_payload=payload.dict(),  # 对应模型 raw_payload
             status="received",
         )
         db.add(event)
@@ -217,8 +218,9 @@ class WebhookService:
                     provider="stripe",
                     event_id=event_id,
                     event_type=event_type,
-                    payload=event,
+                    raw_payload=event,  # 修正:payload -> raw_payload
                     created_at=datetime.utcnow(),
+                    status="ignored"    # 补充状态,虽有默认值但明确更好
                 )
             )
             await db.commit()
@@ -244,7 +246,9 @@ class WebhookService:
                     provider="stripe",
                     event_id=event_id,
                     event_type=event_type,
-                    payment_id=payment.id,
+                    matched_payment_id=payment.id,  # 修正:payment_id -> matched_payment_id
+                    raw_payload=event,              # 补充:记录 raw_payload
+                    status="duplicate",             # 补充:记录状态
                     created_at=datetime.utcnow(),
                 )
             )
@@ -280,9 +284,10 @@ class WebhookService:
                 provider="stripe",
                 event_id=event_id,
                 event_type=event_type,
-                payment_id=payment.id,
-                order_id=order_id,
-                payload=event,
+                matched_payment_id=payment.id,  # 修正:payment_id -> matched_payment_id
+                matched_order_id=order_id,      # 修正:order_id -> matched_order_id
+                raw_payload=event,              # 修正:payload -> raw_payload
+                status="applied",               # 明确状态
                 created_at=now,
             )
         )

+ 1 - 6
app/services/wechat_service.py

@@ -122,13 +122,8 @@ class WechatService:
 
         TEMPLATE = (
             "# {emoji} {headline}\n"
-            "> **Visa Type**: <font color=\"comment\">{visa_type}</font>\n"
-            "> **Earliest**: <font color=\"{color}\">{date_str}</font>\n"
-            "> **Details**: <font color=\"comment\">{slots_summary}</font>\n"
             "\n"
-            "👉 [Tap to Book Appointment]({website})\n"
-            "\n"
-            "<font color=\"comment\">Updated: {updated_at}</font>"
+            "👉 [Book now]({website})\n"
         )
 
         markdown_content = TEMPLATE.format_map({

+ 1 - 1
app/tasks/notification_task.py

@@ -80,7 +80,7 @@ async def notification_consumer(session_factory, redis_client: Redis):
                     
                     # 4. 判断是否需要跳过
                     # 如果记录存在,且 earliest_date 没有变化,则跳过推送
-                    if last_sent_val and last_sent_val.decode('utf-8') == str(earliest_date):
+                    if last_sent_val and last_sent_val == earliest_date:
                         print(f"⏭️  Skipped redundant Wechat notification for {country}-{city} (In Cooling Period)")
                         continue