| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- from sqlalchemy.orm import Session
- from sqlalchemy import func, desc, and_, case
- from datetime import datetime, timedelta, date
- from typing import Dict, Any
- from app.models.order import VasOrder
- from app.models.ticket import VasTicket
- from app.models.vas_task import VasTask
- from app.models.user import VasUser
- from app.models.product import VasProduct
- # 静态汇率配置 (基准: CNY)
- # 实际生产环境建议从数据库或缓存获取实时汇率
- EXCHANGE_RATES = {
- "CNY": 1.0,
- "USD": 7.25,
- "EUR": 7.65,
- "GBP": 9.10,
- "HKD": 0.92,
- "JPY": 0.048
- }
- CURRENCY_SYMBOLS = {
- "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
- }
- class StatisticsService:
- @staticmethod
- def _convert_to_cny(amount: any, currency: str) -> int:
- """
- 辅助函数:将金额(分)转换为人民币(分)
- """
- if not amount:
- return 0
- rate = EXCHANGE_RATES.get(currency, 1.0) # 未知货币默认按 1:1 处理
-
- # 修复点:将 amount 转换为 float,解决 Decimal * float 报错的问题
- return int(float(amount) * rate)
- @staticmethod
- def overview(db: Session) -> Dict[str, Any]:
- """
- 获取后台概览数据 (统一换算为 CNY 统计)
- """
- # --- 1. 核心指标卡片 ---
-
- # 1.1 总营收 (按币种分组求和,再换算)
- revenue_groups = db.query(
- VasOrder.base_currency,
- func.sum(VasOrder.base_amount)
- ).filter(
- VasOrder.status.in_(['paid', 'completed', 'succeeded'])
- ).group_by(VasOrder.base_currency).all()
- total_revenue_cny = 0
- for currency, amount in revenue_groups:
- total_revenue_cny += StatisticsService._convert_to_cny(amount, currency)
- # 1.2 活跃订单数
- total_orders = db.query(func.count(VasOrder.id))\
- .filter(VasOrder.status != 'closed')\
- .scalar() or 0
- # 1.3 活跃用户数
- active_users = db.query(func.count(VasUser.id)).scalar() or 0
- # 1.4 待处理工单
- pending_tickets = db.query(func.count(VasTicket.id))\
- .filter(VasTicket.status.in_(['pending', 'info_required']))\
- .scalar() or 0
- # 1.5 任务成功率
- task_counts = db.query(
- func.count(VasTask.id).label('total'),
- func.sum(case((VasTask.status == 'completed', 1), else_=0)).label('success')
- ).first()
-
- success_rate_str = "0%"
- if task_counts and task_counts.total > 0:
- rate = (task_counts.success / task_counts.total) * 100
- success_rate_str = f"{rate:.1f}%"
- # --- 2. 营收趋势图 (Last 7 Days) ---
-
- revenue_trend = []
- today = date.today()
-
- for i in range(6, -1, -1):
- target_date = today - timedelta(days=i)
- start_dt = datetime.combine(target_date, datetime.min.time())
- end_dt = datetime.combine(target_date, datetime.max.time())
-
- # 按币种分组查询当天的营收
- daily_groups = db.query(
- VasOrder.base_currency,
- func.sum(VasOrder.base_amount).label('amount'),
- func.count(VasOrder.id).label('orders')
- ).filter(
- VasOrder.created_at >= start_dt,
- VasOrder.created_at <= end_dt,
- VasOrder.status.in_(['paid', 'completed', 'succeeded'])
- ).group_by(VasOrder.base_currency).all()
- daily_amount_cny = 0
- daily_order_count = 0
-
- for curr, amt, cnt in daily_groups:
- daily_amount_cny += StatisticsService._convert_to_cny(amt, curr)
- daily_order_count += cnt
- revenue_trend.append({
- "date": target_date.strftime("%m-%d"),
- "amount": float(daily_amount_cny) / 100.0, # 转为元 (浮点数)
- "orders": daily_order_count
- })
- # --- 3. 商品销量分布 ---
-
- product_stats = db.query(
- VasProduct.title,
- func.count(VasOrder.id).label('count')
- ).join(VasOrder, VasOrder.product_id == VasProduct.id)\
- .filter(VasOrder.status.in_(['paid', 'completed', 'succeeded']))\
- .group_by(VasProduct.title)\
- .order_by(desc('count'))\
- .limit(5).all()
- product_dist = [{"name": p.title, "value": p.count} for p in product_stats]
- # --- 4. 最新动态 ---
-
- activities = []
-
- # 订单动态
- recent_orders = db.query(VasOrder).order_by(desc(VasOrder.created_at)).limit(5).all()
- for o in recent_orders:
- symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
- amt_display = f"{symbol}{o.base_amount / 100}"
-
- activities.append({
- "id": f"order_{o.id}",
- "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})",
- "time": o.created_at,
- "type": "order" if o.status == 'pending' else "money"
- })
- # 工单动态
- recent_tickets = db.query(VasTicket).order_by(desc(VasTicket.created_at)).limit(5).all()
- for t in recent_tickets:
- reason_preview = t.reason[:20] + "..." if len(t.reason) > 20 else t.reason
- activities.append({
- "id": f"ticket_{t.id}",
- "text": f"新工单 #{t.id}: {reason_preview}",
- "time": t.created_at,
- "type": "ticket"
- })
-
- # 排序与时间格式化
- activities.sort(key=lambda x: x['time'], reverse=True)
- activities = activities[:10]
- now = datetime.now()
- for act in activities:
- dt = act['time']
- if not isinstance(dt, datetime):
- continue
- diff = now - dt
- if diff.days > 0:
- time_str = f"{diff.days}天前"
- elif diff.seconds > 3600:
- time_str = f"{diff.seconds // 3600}小时前"
- elif diff.seconds > 60:
- time_str = f"{diff.seconds // 60}分钟前"
- else:
- time_str = "刚刚"
- act['time'] = time_str
- return {
- "stats": {
- "totalOrders": total_orders,
- "totalRevenue": total_revenue_cny, # 单位:分 (CNY)
- "activeUsers": active_users,
- "pendingTickets": pending_tickets,
- "successRate": success_rate_str
- },
- "revenue_trend": revenue_trend,
- "product_dist": product_dist,
- "recent_activities": activities
- }
|