jerry hace 3 meses
padre
commit
5e694a4146

+ 21 - 8
app/api/router.py

@@ -37,7 +37,7 @@ from app.schemas.user import VasUserCreate, VasUserUpdate, VasUserSetProfiles, V
 from app.schemas.product import VasProductCreate, VasProductUpdate, VasProductOut
 from app.schemas.product_routing import VasProductRoutingCreate, VasProductRoutingOut
 from app.schemas.schema import VasSchemaCreate, VasSchemaUpdate, VasSchemaOut
-from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs, VasOrderOut
+from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs, VasOrderOut
 from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload, VasPaymentOut
 from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate, VasPaymentConfirmationOut
 from app.schemas.payment_qr import VasPaymentQrCreate, VasPaymentQrSetEnableIn, VasPaymentQrOut
@@ -45,7 +45,7 @@ from app.schemas.payment_provider import VasPaymentProviderCreate, VasPaymentPro
 from app.schemas.webhook import SMSHelperWebhookPayload
 from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate, VasTaskOut
 from app.schemas.ticket import VasTicketCreate, VasTicketOut, VasTicketStatusUpdate, VasTicketMessageCreate, VasTicketMessageOut
-from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
+from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut, SlotOverviewOut
 from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
 from app.schemas.telegram import TelegramIn
 from app.schemas.wechat import WechatIn
@@ -672,6 +672,14 @@ async def slots_latest_get(
     res = await SlotSnapshotService.latest_for(db, country, city, visa_type)
     return success(data=res)
 
+@public_router.get("/slots/overview", summary="查询最近的slot", tags=["Slot数据"], response_model=ApiResponse[List[SlotOverviewOut]])
+async def slots_latest_get(
+    city: str = Query("", description="递交城市"),
+    db: AsyncSession = Depends(get_db)
+):
+    res = await SlotSnapshotService.get_slot_overview(db, city)
+    return success(data=res)
+
 @admin_required_router.post("/slots/report", summary="上报最近的slot", tags=["Slot数据"], response_model=ApiResponse[SlotSnapshotOut])
 async def slots_report(
     payload: SlotSnapshotCreate,
@@ -1000,6 +1008,16 @@ async def vas_order_create_by_admin(
     created_order = await OrderService.create_by_admin(db, payload, product, current_user, redis_client)
     return success(data=created_order)
 
+@admin_required_router.post("/vas/order/adjust-price", summary="管理员调整订单价格", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderOut])
+async def vas_order_adjust_price(
+    order_id: str,
+    payload: VasOrderAdjustPrice,
+    db: AsyncSession = Depends(get_db),
+    current_user: VasUser = Depends(get_current_user)
+):
+    order = await OrderService.adjust_order_price(db, order_id, payload)
+    return success(data=order)
+
 @admin_required_router.get("/vas/order/detail", summary="查询订单", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderOut])
 async def vas_order_create(
     order_id: str,
@@ -1093,12 +1111,7 @@ async def vas_payment_create(
     db: AsyncSession = Depends(get_db),
     redis_client: Redis = Depends(get_redis_client)
 ):
-    rate_table = {
-        "EUR->EUR": "1",
-        "EUR->CNY": "8.3174",
-        "EUR->USD": "1.0842",
-    }
-    res = await PaymentService.create_payment(db, payload, rate_table, redis_client)
+    res = await PaymentService.create_payment(db, payload, redis_client)
     return success(data=res)
 
 @protected_router.get("/vas/payment/detail", summary="获取支付详情", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentOut])

+ 6 - 0
app/models/order.py

@@ -13,6 +13,12 @@ class VasOrder(Base):
     product_name = Column(String(100))
     user_name = Column(String(100))
     base_amount = Column(Integer, nullable=False)
+    adjustment_delta = Column(Integer, default=0)  
+    # 负数 = 优惠,正数 = 加价
+
+    # 订单最终价(冗余字段,方便查询)
+    final_amount = Column(Integer, nullable=False)
+    
     base_currency = Column(String(10), nullable=False)
 
     status = Column(

+ 2 - 0
app/models/payment.py

@@ -30,6 +30,8 @@ class VasPayment(Base):
     )
     
     base_amount = Column(Integer, nullable=False)
+    adjustment_delta = Column(Integer, default=0)
+    final_amount = Column(Integer, nullable=False)
     base_currency = Column(String(10), nullable=False)
 
     amount = Column(Integer, nullable=False)

+ 42 - 5
app/schemas/configuration.py

@@ -1,5 +1,6 @@
-from pydantic import BaseModel
-from typing import Optional
+import json
+from pydantic import BaseModel, model_validator
+from typing import Optional, Any
 from datetime import datetime
 
 
@@ -23,9 +24,45 @@ class ConfigurationUpdate(ConfigurationBase):
 
 class ConfigurationOut(ConfigurationBase):
     id: int
-    created_at: datetime
-    updated_at: datetime
-
+    # 关键点 1: 必须覆盖 config_value 的类型注解为 Any,
+    # 否则 Pydantic 会在输出时强行把它转换回字符串,或者报错
+    config_value: Any 
+    
     model_config = {
         "from_attributes": True
     }
+
+    @model_validator(mode='after')
+    def parse_config_value_by_type(self) -> 'ConfigurationOut':
+        """
+        根据 type 字段自动转换 config_value 的类型
+        """
+        if self.config_value is None or self.type is None:
+            return self
+
+        # 获取原始字符串值
+        raw_value = str(self.config_value)
+        target_type = self.type.lower()
+
+        try:
+            if target_type == 'int' or target_type == 'integer':
+                self.config_value = int(raw_value)
+            
+            elif target_type == 'float':
+                self.config_value = float(raw_value)
+            
+            elif target_type == 'bool' or target_type == 'boolean':
+                # 处理 "true", "1", "yes" 等情况
+                self.config_value = raw_value.lower() in ('true', '1', 'yes', 'on')
+            
+            elif target_type in ('json', 'list', 'dict', 'array', 'object'):
+                # 尝试解析 JSON
+                self.config_value = json.loads(raw_value)
+                
+            # 如果是 string 或其他未定义类型,保持原样
+        except (ValueError, json.JSONDecodeError):
+            # 如果转换失败(比如类型是 int 但值是 "abc"),
+            # 这里选择保持原始字符串不报错,或者你可以选择抛出错误
+            pass
+
+        return self

+ 6 - 0
app/schemas/order.py

@@ -26,6 +26,10 @@ class VasOrderCreate(BaseModel):
 class VasOrderUpdate(VasOrderBase):
     pass
 
+class VasOrderAdjustPrice(BaseModel):
+    adjustment_delta: int
+    reason: Optional[str] = None
+
 class VasOrderPatchUserInputs(BaseModel):
     user_inputs: Dict[str, Any]
 
@@ -35,6 +39,8 @@ class VasOrderOut(VasOrderBase):
     product_name: Optional[str]
     user_name: Optional[str]
     base_amount: int
+    adjustment_delta: int
+    final_amount: int
     base_currency: str
     created_at: datetime
     updated_at: datetime

+ 3 - 0
app/schemas/payment.py

@@ -44,6 +44,9 @@ class VasPaymentOut(VasPaymentBase):
     
     base_amount: int
     base_currency: str
+    
+    adjustment_delta: int
+    final_amount: int
 
     amount: int
     currency: str

+ 9 - 0
app/schemas/slot_snapshot.py

@@ -23,4 +23,13 @@ class SlotSnapshotOut(SlotSnapshotBase):
     model_config = {
         "from_attributes": True
     }
+    
+class SlotOverviewOut(SlotSnapshotBase):
+    id: int
+    routing_key:  Optional[str]=None
+    snapshot_source: Optional[str]=None
+    last_check_at: Optional[datetime]=None
+    model_config = {
+        "from_attributes": True
+    }
 

+ 21 - 15
app/schemas/statistics.py

@@ -1,31 +1,37 @@
-from pydantic import BaseModel
+# app/schemas/statistics.py (或者定义在你的 schemas 文件中)
+
+from pydantic import BaseModel, Field
 from typing import List
 
 # 1. 核心指标统计
 class StatsData(BaseModel):
-    totalOrders: int
-    totalRevenue: int    # 单位:分
-    activeUsers: int
-    pendingTickets: int
-    successRate: str
+    totalOrders: int = Field(..., description="本月订单总数")
+    totalOrdersTrend: float = Field(..., description="订单环比增长率,例如 12.5 代表 +12.5%") # 新增字段
+    
+    totalRevenue: int = Field(..., description="本月总营收 (单位:分)")
+    totalRevenueTrend: float = Field(..., description="营收环比增长率,例如 -5.0 代表 -5.0%") # 新增字段
+    
+    activeUsers: int = Field(..., description="总活跃用户数")
+    pendingTickets: int = Field(..., description="待处理工单数")
+    successRate: str = Field(..., description="机器人任务成功率")
 
 # 2. 营收趋势 (最近7天)
 class RevenueTrendItem(BaseModel):
-    date: str            # 格式: MM-DD
-    amount: float        # 单位:元 (Service层做了 /100 处理)
-    orders: int
+    date: str = Field(..., description="日期,格式 MM-DD")
+    amount: float = Field(..., description="营收金额 (单位:元)") # 注意:Service层已除以100
+    orders: int = Field(..., description="订单数")
 
 # 3. 商品销量分布
 class ProductDistItem(BaseModel):
-    name: str            # 商品标题
-    value: int           # 销量
+    name: str = Field(..., description="商品名称")
+    value: int = Field(..., description="销量")
 
 # 4. 最新动态
 class ActivityItem(BaseModel):
-    id: str              # 唯一标识 (e.g., "order_123")
-    text: str            # 显示文本
-    time: str            # 相对时间 (e.g., "10分钟前")
-    type: str            # 类型: order, money, ticket, system
+    id: str = Field(..., description="唯一标识")
+    text: str = Field(..., description="显示文本")
+    time: str = Field(..., description="相对时间描述,如'10分钟前'")
+    type: str = Field(..., description="类型: order, money, ticket, system")
 
 # === 主响应模型 ===
 class VasStatisticsOverviewOut(BaseModel):

+ 38 - 19
app/services/order_service.py

@@ -15,16 +15,14 @@ from app.models.user import VasUser
 from app.models.order import VasOrder
 from app.models.vas_task import VasTask
 from app.models.product import VasProduct
+from app.models.payment import VasPayment
 from app.models.product_routing import VasProductRouting
-from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
+from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs
 from app.services.webhook_service import WebhookService
 
 
 class OrderService:
 
-    # --------------------------------------------------
-    # 创建订单
-    # --------------------------------------------------
     @staticmethod
     async def create(
         db: AsyncSession,
@@ -47,6 +45,8 @@ class OrderService:
             product_name=product.title,
             base_amount=product.price_amount,
             base_currency=product.price_currency,
+            adjustment_delta=0,
+            final_amount=product.price_amount,
             user_id=auth_user.id,
         )
 
@@ -56,9 +56,6 @@ class OrderService:
 
         return order
     
-    # --------------------------------------------------
-    # 取消订单
-    # --------------------------------------------------
     @staticmethod
     async def cancel(
         db: AsyncSession,
@@ -100,6 +97,8 @@ class OrderService:
             product_name=product.title,
             base_amount=product.price_amount,
             base_currency=product.price_currency,
+            adjustment_delta=0,
+            final_amount=product.price_amount,
             user_id=auth_user.id,
         )
         # ===== user_inputs 安全修复 =====
@@ -127,18 +126,44 @@ class OrderService:
         await db.commit()
         await db.refresh(order)
         return order
+    
+    @staticmethod
+    async def adjust_order_price(db: AsyncSession, order_id: str,  payload: VasOrderAdjustPrice) -> VasOrder:
+        stmt = select(VasOrder).where(VasOrder.id == order_id)
+        order = (await db.execute(stmt)).scalar_one_or_none()
+
+        if not order:
+            raise NotFoundError("Order not exist")
+
+        if order.status != "pending":
+            raise BizLogicError(message="Order not adjustable")
+
+        # 2. 更新订单价格
+        order.adjustment_delta = payload.adjustment_delta
+        order.final_amount = order.base_amount + payload.adjustment_delta
+
+        if order.final_amount <= 0:
+            raise BizLogicError(message="final_amount must be > 0")
+
+        # ② 是否已有 pending payment(幂等)
+        stmt = select(VasPayment).where(
+            VasPayment.order_id == order.id,
+            VasPayment.status == "pending",
+        )
+        active_payment = (await db.execute(stmt)).scalar_one_or_none()
+
+        if active_payment:
+            active_payment.status = "expired"
+
+        await db.commit()
+        await db.refresh(order)
+        return order
 
-    # --------------------------------------------------
-    # 获取订单
-    # --------------------------------------------------
     @staticmethod
     async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
         stmt = select(VasOrder).where(VasOrder.id == order_id)
         return (await db.execute(stmt)).scalar_one_or_none()
 
-    # --------------------------------------------------
-    # 用户订单列表
-    # --------------------------------------------------
     @staticmethod
     async def list_by_user(
         db: AsyncSession,
@@ -157,9 +182,6 @@ class OrderService:
 
         return await paginate(db, stmt, page, size)
 
-    # --------------------------------------------------
-    # 管理员订单列表
-    # --------------------------------------------------
     @staticmethod
     async def list_all(
         db: AsyncSession,
@@ -177,9 +199,6 @@ class OrderService:
 
         return await paginate(db, query, page, size)
 
-    # --------------------------------------------------
-    # 更新 user_inputs
-    # --------------------------------------------------
     @staticmethod
     async def patch_user_inputs(
         db: AsyncSession,

+ 137 - 32
app/services/payment_service.py

@@ -4,7 +4,8 @@ import time
 import stripe
 import random
 import uuid
-from typing import Dict, List, Optional
+import json
+from typing import Dict, List, Optional, Any
 from redis.asyncio import Redis
 from decimal import Decimal, ROUND_HALF_UP
 from datetime import datetime, timedelta
@@ -25,24 +26,84 @@ from app.models.verification_token import VasVerificationToken
 from app.models.payment_provider import VasPaymentProvider
 from app.models.payment_qr import VasPaymentQR
 from app.models.payment_confirmation import VasPaymentConfirmation
+from app.models.configuration import Configuration
 from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload
 from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate
 from app.services.notification_service import NotificationService
 from app.services.webhook_service import WebhookService
 
 
+def convert_currency(
+    amount: int,
+    from_currency: str,
+    to_currency: str,
+    exchange_config: Dict[str, Any],
+) -> Decimal:
+    """
+    根据配置进行货币转换。
+    amount: 最小单位(如分/cents)
+    exchange_config: { "base": "CNY", "rates": { "CNY": 1.0, "USD": 7.25, ... } }
+    计算公式: Target = Source * (Rate_From / Rate_To)
+    """
+    if from_currency == to_currency:
+        return Decimal(amount)
+
+    rates = exchange_config.get("rates", {})
+    
+    # 校验货币是否支持
+    if from_currency not in rates:
+        raise BizLogicError(f"Exchange rate for {from_currency} not found in configuration")
+    if to_currency not in rates:
+        raise BizLogicError(f"Exchange rate for {to_currency} not found in configuration")
+
+    # 获取相对于 Base 的汇率
+    from_rate = Decimal(str(rates[from_currency]))
+    to_rate = Decimal(str(rates[to_currency]))
+
+    if to_rate == 0:
+        raise BizLogicError(f"Invalid exchange rate for {to_currency} (0)")
+
+    # 交叉汇率计算
+    converted = (Decimal(amount) * from_rate) / to_rate
+    
+    # 四舍五入保留整数(支付系统通常使用最小单位)
+    return converted.quantize(Decimal("1"), rounding=ROUND_HALF_UP)
+
+
 class PaymentService:
 
-    # --------------------------------------------------
-    # 创建支付(统一入口)
-    # --------------------------------------------------
+    @staticmethod
+    async def _get_exchange_config(db: AsyncSession) -> Dict:
+        """
+        内部辅助方法:从 Configuration 表读取汇率配置
+        """
+        stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
+        config_obj = (await db.execute(stmt)).scalar_one_or_none()
+        
+        if not config_obj:
+            raise BizLogicError("System configuration 'EXCHANGE_RATES' is missing")
+        
+        # 处理 config_value,可能是字符串也可能是已转换的字典(取决于ORM配置)
+        val = config_obj.config_value
+        if isinstance(val, str):
+            try:
+                return json.loads(val)
+            except json.JSONDecodeError:
+                raise BizLogicError("Invalid JSON format in 'EXCHANGE_RATES'")
+        elif isinstance(val, dict):
+            return val
+        else:
+            raise BizLogicError("Invalid type for 'EXCHANGE_RATES' configuration")
+
     @staticmethod
     async def create_payment(
         db: AsyncSession,
         payload: VasPaymentCreate,
-        rate_table: Dict,
         redis_client: Redis
     ) -> VasPayment:
+        """
+        创建支付单的主入口
+        """
 
         # ① 锁住订单(防并发)
         stmt = (
@@ -65,15 +126,18 @@ class PaymentService:
             if active_payment.provider == payload.provider:
                 return active_payment
             else:
-                active_payment.status = "failed"
+                active_payment.status = "expired"
+
+        # ③ 获取最新的汇率配置
+        exchange_config = await PaymentService._get_exchange_config(db)
 
-        # ③ 根据 provider 创建
+        #  根据 provider 创建
         if payload.provider in ("wechat", "alipay"):
             payment = await PaymentService.create_offline_payment(
                 db=db,
                 order=order,
                 provider_name=payload.provider,
-                rate_table=rate_table,
+                exchange_config=exchange_config,
             )
             await db.commit()
             return payment
@@ -82,7 +146,7 @@ class PaymentService:
             payment = await PaymentService.create_stripe_payment(
                 db=db,
                 order=order,
-                rate_table=rate_table,
+                exchange_config=exchange_config,
             )
             await db.commit()
             return payment
@@ -184,7 +248,7 @@ class PaymentService:
         payload: AdminUpdateStatusPayload
     ):
         """
-        管理员确认用户的支付
+        管理员强制更新支付状态
         """
         if payload.status == "succeeded":
             payment = await PaymentService._confirm_payment_action(db, payment_id, payload.remark)
@@ -316,7 +380,7 @@ class PaymentService:
         db: AsyncSession,
         order: VasOrder,
         provider_name: str,
-        rate_table: Dict,
+        exchange_config: Dict,
     ) -> VasPayment:
 
         payment = (
@@ -341,21 +405,42 @@ class PaymentService:
         if not qrs:
             raise BizLogicError("No payment QR available")
 
+        # 选择QR码也需要稳定吗?如果是,也可以用 rng.choice(qrs)
+        # 这里暂时只改 random_offset
         qr = random.choice(qrs)
         payment.qr_id = qr.id
 
-        rate_key = f"{order.base_currency}->{provider.currency}".upper()
-        exchange_rate = Decimal(rate_table[rate_key])
-
-        converted = (
-            Decimal(payment.base_amount) * exchange_rate
-        ).quantize(Decimal("1"), rounding=ROUND_HALF_UP)
+        # --- 汇率计算 ---
+        amount_converted = convert_currency(
+            amount=payment.final_amount,
+            from_currency=order.base_currency,
+            to_currency=provider.currency,
+            exchange_config=exchange_config,
+        )
+        
+        rates = exchange_config.get("rates", {})
+        rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
+        rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
+        
+        if rate_to == 0:
+            raise BizLogicError("Invalid target currency rate (0)")
+        
+        current_exchange_rate = rate_from / rate_to
 
-        max_discount = min(99, int(converted * Decimal("0.01")))
-        discount = random.randint(1, max_discount) if max_discount >= 1 else 0
+        # --- 稳定的随机立减逻辑 ---
+        # 规则:最大减免为金额的 1% 或 99分(取小值)
+        max_discount = min(99, int(amount_converted * Decimal("0.01")))
+        
+        if max_discount >= 1:
+            # 关键修改:使用 order.id 作为种子
+            # 这确保了同一个订单号,无论计算多少次,得到的 discount 是一样的
+            rng = random.Random(order.id)
+            discount = rng.randint(1, max_discount)
+        else:
+            discount = 0
 
-        payment.exchange_rate = exchange_rate
-        payment.amount = int(converted) - discount
+        payment.exchange_rate = current_exchange_rate
+        payment.amount = int(amount_converted) - discount
         payment.currency = provider.currency
         payment.random_offset = discount
 
@@ -365,7 +450,7 @@ class PaymentService:
     async def create_stripe_payment(
         db: AsyncSession,
         order: VasOrder,
-        rate_table: Dict,
+        exchange_config: Dict,
     ) -> VasPayment:
 
         payment = await PaymentService._create_stripe_payment(db, order)
@@ -378,17 +463,26 @@ class PaymentService:
         if not provider:
             raise BizLogicError("Stripe provider not enabled")
 
-        rate_key = f"{order.base_currency}->{provider.currency}".upper()
-        exchange_rate = Decimal(rate_table[rate_key])
+        amount_converted = convert_currency(
+            amount=payment.final_amount,
+            from_currency=order.base_currency,
+            to_currency=provider.currency,
+            exchange_config=exchange_config,
+        )
 
-        converted = (
-            Decimal(payment.base_amount) * exchange_rate
-        ).quantize(Decimal("1"), rounding=ROUND_HALF_UP)
+        rates = exchange_config.get("rates", {})
+        rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
+        rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
+        
+        if rate_to == 0:
+             raise BizLogicError("Invalid target currency rate (0)")
+             
+        current_exchange_rate = rate_from / rate_to
 
-        payment.exchange_rate = exchange_rate
-        payment.amount = int(converted)
+        payment.exchange_rate = current_exchange_rate
+        payment.amount = int(amount_converted)
         payment.currency = provider.currency
-        payment.random_offset = 0
+        payment.random_offset = 0 # Stripe 不需要随机减免
 
         stripe_session = PaymentService.create_checkout_session(
             order=order,
@@ -446,8 +540,12 @@ class PaymentService:
             order_id=order.id,
             provider="wechat",
             channel="qr_static",
+            
             base_amount=order.base_amount,
+            adjustment_delta=order.adjustment_delta,
+            final_amount=order.final_amount,
             base_currency=order.base_currency,
+            
             amount=0,
             currency="CNY",
             random_offset=0,
@@ -469,8 +567,12 @@ class PaymentService:
             order_id=order.id,
             provider="alipay",
             channel="qr_static",
+            
             base_amount=order.base_amount,
+            adjustment_delta=order.adjustment_delta,
+            final_amount=order.final_amount,
             base_currency=order.base_currency,
+            
             amount=0,
             currency="CNY",
             random_offset=0,
@@ -492,8 +594,12 @@ class PaymentService:
             order_id=order.id,
             provider="stripe",
             channel="online_link",
+            
             base_amount=order.base_amount,
+            adjustment_delta=order.adjustment_delta,
+            final_amount=order.final_amount,
             base_currency=order.base_currency,
+            
             amount=0,
             currency="EUR",
             random_offset=0,
@@ -523,5 +629,4 @@ class PaymentService:
         id: int,
     ) -> VasPayment:
         stmt = select(VasPayment).where(VasPayment.id == id)
-        return (await db.execute(stmt)).scalar_one_or_none()
-
+        return (await db.execute(stmt)).scalar_one_or_none()

+ 74 - 1
app/services/slot_snapshot_service.py

@@ -1,11 +1,14 @@
 # app/services/slot_snapshot_service.py
 
 from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy import select
+from sqlalchemy import select, func, desc, and_
 from redis.asyncio import Redis
+from typing import List, Dict, Any
 from app.models.slot_snapshot import VasSlotSnapshot
 from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
 from app.services.notification_service import NotificationService
+from app.models.slot_snapshot import VasSlotSnapshot
+from app.models.slot_refresh_status import VasSlotRefreshStatus
 
 
 class SlotSnapshotService:
@@ -61,3 +64,73 @@ class SlotSnapshotService:
         )
 
         return await db.scalar(stmt)
+    
+    async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
+        """
+        异步获取指定城市的最新 Slot 快照 dashboard 数据
+        修正逻辑:按 (Country, City, VisaType) 这一组业务主键去重,只取最新的一条。
+        """
+        
+        # 1. 子查询:找出该城市下,每个【业务类型】最新的一条记录 ID
+        # 也就是:在同一个城市、同一个国家、同一个签证类型下,只取 ID 最大的那条
+        subquery = (
+            select(func.max(VasSlotSnapshot.id).label("max_id"))
+            .where(VasSlotSnapshot.city == city)
+            .group_by(
+                VasSlotSnapshot.country, 
+                VasSlotSnapshot.city, 
+                VasSlotSnapshot.visa_type
+            )
+            .subquery()
+        )
+
+        # 2. 主查询
+        # Join 逻辑修改:
+        # - Inner Join 子查询:确保只拿到最新的 snapshot
+        # - Outer Join 状态表:不再仅依赖 routing_key (因为 snapshot 里可能是 null),
+        #   而是通过 country/city/visa_type 强关联,这样即使 routing_key 丢失也能匹配到状态。
+        stmt = (
+            select(VasSlotSnapshot, VasSlotRefreshStatus)
+            .join(subquery, VasSlotSnapshot.id == subquery.c.max_id)
+            .outerjoin(
+                VasSlotRefreshStatus, 
+                and_(
+                    VasSlotSnapshot.country == VasSlotRefreshStatus.country,
+                    VasSlotSnapshot.city == VasSlotRefreshStatus.city,
+                    VasSlotSnapshot.visa_type == VasSlotRefreshStatus.visa_type
+                )
+            )
+            .order_by(VasSlotSnapshot.country)
+        )
+
+        # 3. 执行查询
+        result = await db.execute(stmt)
+        rows = result.all()
+
+        # 4. 组装数据
+        dashboard_data = []
+        
+        for row in rows:
+            snap: VasSlotSnapshot = row[0]
+            status: VasSlotRefreshStatus = row[1]
+
+            item = {
+                "id": snap.id,
+                "country": snap.country,
+                "city": snap.city,
+                "visa_type": snap.visa_type,
+                "routing_key": snap.routing_key, # 即使是 null 也没关系,展示用
+                "availability_status": snap.availability_status,
+                "earliest_date": snap.earliest_date,
+                "snapshot_at": snap.snapshot_at,
+                "website": snap.website,
+                # 优先从 status 表取心跳,如果关联不上,就为 null
+                "last_check_at": status.last_success_at if status else None
+            }
+            
+            if snap.availability:
+                item["availability"] = snap.availability
+
+            dashboard_data.append(item)
+
+        return dashboard_data

+ 140 - 61
app/services/statistics_service.py

@@ -1,49 +1,99 @@
 # app/services/statistics_service.py
 
+import json
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import func, desc, case, select
 from datetime import datetime, timedelta, date
-from typing import Dict, Any
+from typing import Dict, Any, Tuple
 
 from app.models.order import VasOrder
 from app.models.ticket import VasTicket
 from app.models.vas_task import VasTask
 from app.models.user import VasUser
 from app.models.product import VasProduct
-
+from app.models.configuration import Configuration
 
 # ======================
-# 汇率 & 货币符号
+# 货币符号 (仅用于展示文本,无需存库)
 # ======================
-
-EXCHANGE_RATES = {
-    "CNY": 1.0,
-    "USD": 7.25,
-    "EUR": 7.65,
-    "GBP": 9.10,
-    "HKD": 0.92,
-    "JPY": 0.048
-}
-
 CURRENCY_SYMBOLS = {
     "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
 }
 
-
 class StatisticsService:
 
     # ======================
-    # 工具方法
+    # 辅助方法:汇率与时间
     # ======================
 
     @staticmethod
-    def _convert_to_cny(amount: any, currency: str) -> int:
-        """金额(分) → CNY(分)"""
+    async def _get_exchange_rates(db: AsyncSession) -> Dict[str, float]:
+        """
+        从数据库获取动态汇率配置
+        返回格式示例: {'CNY': 1.0, 'USD': 7.25, ...}
+        """
+        stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
+        config_obj = (await db.execute(stmt)).scalar_one_or_none()
+        
+        # 默认兜底汇率
+        fallback_rates = {"CNY": 1.0}
+
+        if not config_obj:
+            return fallback_rates
+        
+        try:
+            val = config_obj.config_value
+            data = {}
+            if isinstance(val, str):
+                data = json.loads(val)
+            elif isinstance(val, dict):
+                data = val
+            
+            return data.get("rates", fallback_rates)
+        except Exception:
+            return fallback_rates
+
+    @staticmethod
+    def _convert_to_cny(amount: Any, currency: str, rates: Dict[str, float]) -> int:
+        """
+        金额(分) → CNY(分)
+        """
         if not amount:
             return 0
-        rate = EXCHANGE_RATES.get(currency, 1.0)
+        rate = float(rates.get(currency, 1.0))
         return int(float(amount) * rate)
 
+    @staticmethod
+    def _get_month_ranges() -> Tuple[datetime, datetime, datetime, datetime]:
+        """
+        获取时间范围用于环比计算
+        返回: (本月开始, 本月结束, 上月开始, 上月结束)
+        """
+        now = datetime.now()
+        
+        # 本月范围
+        this_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
+        this_month_end = now  # 截止到当前时间
+
+        # 上月范围
+        # 上月结束 = 本月开始 - 1微秒
+        last_month_end_dt = this_month_start - timedelta(microseconds=1)
+        # 上月开始
+        last_month_start = last_month_end_dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
+        
+        # 为了公平比较(如果需要),可以将上月结束时间也限制在和本月同样的天数进度
+        # 但通常简易报表直接对比 上月全月总额 vs 本月至今总额,或者对比 上月至今
+        # 这里采用简单的 上月全月 vs 本月至今
+        
+        return this_month_start, this_month_end, last_month_start, last_month_end_dt
+
+    @staticmethod
+    def _calculate_growth(current: float, previous: float) -> float:
+        """计算增长率百分比"""
+        if previous == 0:
+            return 100.0 if current > 0 else 0.0
+        return ((current - previous) / previous) * 100.0
+
     # ======================
     # 核心接口
     # ======================
@@ -54,41 +104,66 @@ class StatisticsService:
         后台统计概览(Async 版)
         """
 
+        # 0. 准备基础数据
+        current_rates = await StatisticsService._get_exchange_rates(db)
+        tm_start, tm_end, lm_start, lm_end = StatisticsService._get_month_ranges()
+
         # --------------------------------------------------
-        # 1. 核心指标
+        # 1. 营收统计 (本月 vs 上月) - 用于计算 Trend
         # --------------------------------------------------
-
-        # 1.1 总营收(按币种分组)
-        revenue_stmt = (
-            select(
-                VasOrder.base_currency,
-                func.sum(VasOrder.base_amount)
+        
+        async def calculate_revenue(start_dt, end_dt):
+            stmt = (
+                select(VasOrder.base_currency, func.sum(VasOrder.base_amount))
+                .where(
+                    VasOrder.created_at >= start_dt,
+                    VasOrder.created_at <= end_dt,
+                    VasOrder.status.in_(["paid", "completed", "succeeded"])
+                )
+                .group_by(VasOrder.base_currency)
             )
-            .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
-            .group_by(VasOrder.base_currency)
-        )
+            rows = (await db.execute(stmt)).all()
+            total_cny = sum(
+                StatisticsService._convert_to_cny(amt, curr, current_rates)
+                for curr, amt in rows
+            )
+            return total_cny
 
-        revenue_rows = (await db.execute(revenue_stmt)).all()
+        revenue_this_month = await calculate_revenue(tm_start, tm_end)
+        revenue_last_month = await calculate_revenue(lm_start, lm_end)
+        
+        revenue_trend = StatisticsService._calculate_growth(revenue_this_month, revenue_last_month)
 
-        total_revenue_cny = sum(
-            StatisticsService._convert_to_cny(amount, currency)
-            for currency, amount in revenue_rows
-        )
+        # --------------------------------------------------
+        # 2. 订单量统计 (本月 vs 上月) - 用于计算 Trend
+        # --------------------------------------------------
 
-        # 1.2 活跃订单数
-        total_orders = (
-            await db.scalar(
+        async def calculate_orders(start_dt, end_dt):
+            count = await db.scalar(
                 select(func.count(VasOrder.id))
-                .where(VasOrder.status == "paid")
+                .where(
+                    VasOrder.created_at >= start_dt,
+                    VasOrder.created_at <= end_dt,
+                    VasOrder.status.in_(["paid", "completed", "succeeded"]) # 仅统计有效订单
+                )
             )
-        ) or 0
+            return count or 0
 
-        # 1.3 活跃用户数
+        orders_this_month = await calculate_orders(tm_start, tm_end)
+        orders_last_month = await calculate_orders(lm_start, lm_end)
+
+        orders_trend = StatisticsService._calculate_growth(orders_this_month, orders_last_month)
+
+        # --------------------------------------------------
+        # 3. 其他静态指标
+        # --------------------------------------------------
+
+        # 活跃用户数 (总累计)
         active_users = (
             await db.scalar(select(func.count(VasUser.id)))
         ) or 0
 
-        # 1.4 待处理工单
+        # 待处理工单
         pending_tickets = (
             await db.scalar(
                 select(func.count(VasTicket.id))
@@ -96,14 +171,13 @@ class StatisticsService:
             )
         ) or 0
 
-        # 1.5 任务成功率
+        # 任务成功率
         task_stmt = select(
             func.count(VasTask.id).label("total"),
             func.sum(
                 case((VasTask.status == "completed", 1), else_=0)
             ).label("success")
         )
-
         task_counts = (await db.execute(task_stmt)).first()
 
         success_rate_str = "0%"
@@ -112,10 +186,10 @@ class StatisticsService:
             success_rate_str = f"{rate:.1f}%"
 
         # --------------------------------------------------
-        # 2. 最近 7 天营收趋势
+        # 4. 图表数据:最近 7 天营收趋势
         # --------------------------------------------------
 
-        revenue_trend = []
+        revenue_trend_chart = []
         today = date.today()
 
         for i in range(6, -1, -1):
@@ -132,7 +206,7 @@ class StatisticsService:
                 .where(
                     VasOrder.created_at >= start_dt,
                     VasOrder.created_at <= end_dt,
-                    VasOrder.status.in_(["paid", "completed"])
+                    VasOrder.status.in_(["paid", "completed", "succeeded"])
                 )
                 .group_by(VasOrder.base_currency)
             )
@@ -143,17 +217,17 @@ class StatisticsService:
             daily_order_count = 0
 
             for curr, amt, cnt in daily_rows:
-                daily_amount_cny += StatisticsService._convert_to_cny(amt, curr)
+                daily_amount_cny += StatisticsService._convert_to_cny(amt, curr, current_rates)
                 daily_order_count += cnt
 
-            revenue_trend.append({
+            revenue_trend_chart.append({
                 "date": target_date.strftime("%m-%d"),
-                "amount": daily_amount_cny / 100.0,
+                "amount": daily_amount_cny / 100.0, # 转为元
                 "orders": daily_order_count
             })
 
         # --------------------------------------------------
-        # 3. 商品销量分布(Top 5)
+        # 5. 商品销量分布(Top 5)
         # --------------------------------------------------
 
         product_stmt = (
@@ -169,14 +243,13 @@ class StatisticsService:
         )
 
         product_rows = (await db.execute(product_stmt)).all()
-
         product_dist = [
             {"name": title, "value": count}
             for title, count in product_rows
         ]
 
         # --------------------------------------------------
-        # 4. 最新动态
+        # 6. 最新动态 (混合订单和工单)
         # --------------------------------------------------
 
         activities = []
@@ -190,6 +263,7 @@ class StatisticsService:
         recent_orders = (await db.execute(order_stmt)).scalars().all()
 
         for o in recent_orders:
+            # 动态仅作展示,这里使用原始币种即可,不需要转CNY
             symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
             amt_display = f"{symbol}{o.base_amount / 100}"
 
@@ -221,14 +295,15 @@ class StatisticsService:
                 "type": "ticket"
             })
 
-        # 排序 + 时间人性化
+        # 按时间倒序排序
         activities.sort(key=lambda x: x["time"], reverse=True)
         activities = activities[:10]
 
-        now = datetime.now()
+        # 格式化时间显示
+        now_dt = datetime.now()
         for act in activities:
             dt = act["time"]
-            diff = now - dt
+            diff = now_dt - dt
             if diff.days > 0:
                 act["time"] = f"{diff.days}天前"
             elif diff.seconds > 3600:
@@ -239,18 +314,22 @@ class StatisticsService:
                 act["time"] = "刚刚"
 
         # --------------------------------------------------
-        # 返回结果
+        # 7. 组装返回结果
         # --------------------------------------------------
 
         return {
             "stats": {
-                "totalOrders": total_orders,
-                "totalRevenue": total_revenue_cny,  # CNY 分
+                "totalOrders": orders_this_month,       # 仅显示本月,为了匹配趋势语境
+                "totalOrdersTrend": orders_trend,       # 新增:订单增长率
+                
+                "totalRevenue": revenue_this_month,     # 仅显示本月营收 (分)
+                "totalRevenueTrend": revenue_trend,     # 新增:营收增长率
+                
                 "activeUsers": active_users,
                 "pendingTickets": pending_tickets,
                 "successRate": success_rate_str
             },
-            "revenue_trend": revenue_trend,
-            "product_dist": product_dist,
-            "recent_activities": activities
-        }
+            "revenue_trend": revenue_trend_chart,       # 图表数据
+            "product_dist": product_dist,               # 饼图数据
+            "recent_activities": activities             # 动态列表
+        }

+ 30 - 0
app/tasks/notification_task.py

@@ -9,6 +9,9 @@ from app.services.wechat_service import WechatService
 from app.services.email_authorizations_service import EmailAuthorizationService
 from app.utils.redis_utils import redis_qpop
 
+
+THROTTLE_EXPIRY = 1800 
+
 async def notification_consumer(session_factory, redis_client: Redis):
     """
     异步消费 Redis 队列 vas_notification_queue
@@ -56,11 +59,38 @@ async def notification_consumer(session_factory, redis_client: Redis):
 
             if "wechat" == channel:
                 api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
+                
                 if "payment_user_confirmed" == template_id:
                     status = await WechatService.push_payment_template(api_token, payload)
                     print(f"Wechat send status: {status}")
+                
                 if "slot_snapshot" == template_id:
+                    # 1. 提取标识字段
+                    country = payload.get("country", "unknown")
+                    city = payload.get("city", "unknown")
+                    visa_type = payload.get("visa_type", "unknown")
+                    earliest_date = payload.get("earliest_date", "N/A")
+                    
+                    # 2. 生成 Redis 频率限制 Key
+                    # 格式: throttle:slot_snapshot:USA:Beijing:B1
+                    throttle_key = f"throttle:slot_snapshot:{country}:{city}:{visa_type}"
+                    
+                    # 3. 检查是否存在记录(即是否在冷却期内)
+                    last_sent_val = await redis_client.get(throttle_key)
+                    
+                    # 4. 判断是否需要跳过
+                    # 如果记录存在,且 earliest_date 没有变化,则跳过推送
+                    if last_sent_val and last_sent_val.decode('utf-8') == str(earliest_date):
+                        print(f"⏭️  Skipped redundant Wechat notification for {country}-{city} (In Cooling Period)")
+                        continue
+                    
+                    # 5. 执行发送
                     status = await WechatService.push_slot_snapshot(api_token, payload)
+                    print(f"Wechat send status: {status}")
+                    
+                    # 6. 发送成功后更新 Redis 记录并设置过期时间
+                    # 存储当前的最早日期,下次如果日期变了,即便没过 30 分钟也会再次推送
+                    await redis_client.set(throttle_key, str(earliest_date), ex=THROTTLE_EXPIRY)
             print(f"✅ Notification sent: {message.get('notification_id')}")
 
         except Exception as e: