from sqlalchemy.orm import Session from sqlalchemy import func, desc, and_, case from datetime import datetime, timedelta, date from typing import Dict, Any from app.models.order import VasOrder from app.models.ticket import VasTicket from app.models.vas_task import VasTask from app.models.user import VasUser from app.models.product import VasProduct # 静态汇率配置 (基准: CNY) # 实际生产环境建议从数据库或缓存获取实时汇率 EXCHANGE_RATES = { "CNY": 1.0, "USD": 7.25, "EUR": 7.65, "GBP": 9.10, "HKD": 0.92, "JPY": 0.048 } CURRENCY_SYMBOLS = { "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥" } class StatisticsService: @staticmethod def _convert_to_cny(amount: any, currency: str) -> int: """ 辅助函数:将金额(分)转换为人民币(分) """ if not amount: return 0 rate = EXCHANGE_RATES.get(currency, 1.0) # 未知货币默认按 1:1 处理 # 修复点:将 amount 转换为 float,解决 Decimal * float 报错的问题 return int(float(amount) * rate) @staticmethod def overview(db: Session) -> Dict[str, Any]: """ 获取后台概览数据 (统一换算为 CNY 统计) """ # --- 1. 核心指标卡片 --- # 1.1 总营收 (按币种分组求和,再换算) revenue_groups = db.query( VasOrder.base_currency, func.sum(VasOrder.base_amount) ).filter( VasOrder.status.in_(['paid', 'completed', 'succeeded']) ).group_by(VasOrder.base_currency).all() total_revenue_cny = 0 for currency, amount in revenue_groups: total_revenue_cny += StatisticsService._convert_to_cny(amount, currency) # 1.2 活跃订单数 total_orders = db.query(func.count(VasOrder.id))\ .filter(VasOrder.status != 'closed')\ .scalar() or 0 # 1.3 活跃用户数 active_users = db.query(func.count(VasUser.id)).scalar() or 0 # 1.4 待处理工单 pending_tickets = db.query(func.count(VasTicket.id))\ .filter(VasTicket.status.in_(['pending', 'info_required']))\ .scalar() or 0 # 1.5 任务成功率 task_counts = db.query( func.count(VasTask.id).label('total'), func.sum(case((VasTask.status == 'completed', 1), else_=0)).label('success') ).first() success_rate_str = "0%" if task_counts and task_counts.total > 0: rate = (task_counts.success / task_counts.total) * 100 success_rate_str = f"{rate:.1f}%" # --- 2. 营收趋势图 (Last 7 Days) --- revenue_trend = [] today = date.today() for i in range(6, -1, -1): target_date = today - timedelta(days=i) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date, datetime.max.time()) # 按币种分组查询当天的营收 daily_groups = db.query( VasOrder.base_currency, func.sum(VasOrder.base_amount).label('amount'), func.count(VasOrder.id).label('orders') ).filter( VasOrder.created_at >= start_dt, VasOrder.created_at <= end_dt, VasOrder.status.in_(['paid', 'completed', 'succeeded']) ).group_by(VasOrder.base_currency).all() daily_amount_cny = 0 daily_order_count = 0 for curr, amt, cnt in daily_groups: daily_amount_cny += StatisticsService._convert_to_cny(amt, curr) daily_order_count += cnt revenue_trend.append({ "date": target_date.strftime("%m-%d"), "amount": float(daily_amount_cny) / 100.0, # 转为元 (浮点数) "orders": daily_order_count }) # --- 3. 商品销量分布 --- product_stats = db.query( VasProduct.title, func.count(VasOrder.id).label('count') ).join(VasOrder, VasOrder.product_id == VasProduct.id)\ .filter(VasOrder.status.in_(['paid', 'completed', 'succeeded']))\ .group_by(VasProduct.title)\ .order_by(desc('count'))\ .limit(5).all() product_dist = [{"name": p.title, "value": p.count} for p in product_stats] # --- 4. 最新动态 --- activities = [] # 订单动态 recent_orders = db.query(VasOrder).order_by(desc(VasOrder.created_at)).limit(5).all() for o in recent_orders: symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency) amt_display = f"{symbol}{o.base_amount / 100}" activities.append({ "id": f"order_{o.id}", "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})", "time": o.created_at, "type": "order" if o.status == 'pending' else "money" }) # 工单动态 recent_tickets = db.query(VasTicket).order_by(desc(VasTicket.created_at)).limit(5).all() for t in recent_tickets: reason_preview = t.reason[:20] + "..." if len(t.reason) > 20 else t.reason activities.append({ "id": f"ticket_{t.id}", "text": f"新工单 #{t.id}: {reason_preview}", "time": t.created_at, "type": "ticket" }) # 排序与时间格式化 activities.sort(key=lambda x: x['time'], reverse=True) activities = activities[:10] now = datetime.now() for act in activities: dt = act['time'] if not isinstance(dt, datetime): continue diff = now - dt if diff.days > 0: time_str = f"{diff.days}天前" elif diff.seconds > 3600: time_str = f"{diff.seconds // 3600}小时前" elif diff.seconds > 60: time_str = f"{diff.seconds // 60}分钟前" else: time_str = "刚刚" act['time'] = time_str return { "stats": { "totalOrders": total_orders, "totalRevenue": total_revenue_cny, # 单位:分 (CNY) "activeUsers": active_users, "pendingTickets": pending_tickets, "successRate": success_rate_str }, "revenue_trend": revenue_trend, "product_dist": product_dist, "recent_activities": activities }