# app/services/statistics_service.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import func, desc, case, select from datetime import datetime, timedelta, date from typing import Dict, Any from app.models.order import VasOrder from app.models.ticket import VasTicket from app.models.vas_task import VasTask from app.models.user import VasUser from app.models.product import VasProduct # ====================== # 汇率 & 货币符号 # ====================== EXCHANGE_RATES = { "CNY": 1.0, "USD": 7.25, "EUR": 7.65, "GBP": 9.10, "HKD": 0.92, "JPY": 0.048 } CURRENCY_SYMBOLS = { "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥" } class StatisticsService: # ====================== # 工具方法 # ====================== @staticmethod def _convert_to_cny(amount: any, currency: str) -> int: """金额(分) → CNY(分)""" if not amount: return 0 rate = EXCHANGE_RATES.get(currency, 1.0) return int(float(amount) * rate) # ====================== # 核心接口 # ====================== @staticmethod async def overview(db: AsyncSession) -> Dict[str, Any]: """ 后台统计概览(Async 版) """ # -------------------------------------------------- # 1. 核心指标 # -------------------------------------------------- # 1.1 总营收(按币种分组) revenue_stmt = ( select( VasOrder.base_currency, func.sum(VasOrder.base_amount) ) .where(VasOrder.status.in_(["paid", "completed", "succeeded"])) .group_by(VasOrder.base_currency) ) revenue_rows = (await db.execute(revenue_stmt)).all() total_revenue_cny = sum( StatisticsService._convert_to_cny(amount, currency) for currency, amount in revenue_rows ) # 1.2 活跃订单数 total_orders = ( await db.scalar( select(func.count(VasOrder.id)) .where(VasOrder.status != "closed") ) ) or 0 # 1.3 活跃用户数 active_users = ( await db.scalar(select(func.count(VasUser.id))) ) or 0 # 1.4 待处理工单 pending_tickets = ( await db.scalar( select(func.count(VasTicket.id)) .where(VasTicket.status.in_(["pending", "info_required"])) ) ) or 0 # 1.5 任务成功率 task_stmt = select( func.count(VasTask.id).label("total"), func.sum( case((VasTask.status == "completed", 1), else_=0) ).label("success") ) task_counts = (await db.execute(task_stmt)).first() success_rate_str = "0%" if task_counts and task_counts.total > 0: rate = (task_counts.success / task_counts.total) * 100 success_rate_str = f"{rate:.1f}%" # -------------------------------------------------- # 2. 最近 7 天营收趋势 # -------------------------------------------------- revenue_trend = [] today = date.today() for i in range(6, -1, -1): target_date = today - timedelta(days=i) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date, datetime.max.time()) daily_stmt = ( select( VasOrder.base_currency, func.sum(VasOrder.base_amount).label("amount"), func.count(VasOrder.id).label("orders") ) .where( VasOrder.created_at >= start_dt, VasOrder.created_at <= end_dt, VasOrder.status.in_(["paid", "completed", "succeeded"]) ) .group_by(VasOrder.base_currency) ) daily_rows = (await db.execute(daily_stmt)).all() daily_amount_cny = 0 daily_order_count = 0 for curr, amt, cnt in daily_rows: daily_amount_cny += StatisticsService._convert_to_cny(amt, curr) daily_order_count += cnt revenue_trend.append({ "date": target_date.strftime("%m-%d"), "amount": daily_amount_cny / 100.0, "orders": daily_order_count }) # -------------------------------------------------- # 3. 商品销量分布(Top 5) # -------------------------------------------------- product_stmt = ( select( VasProduct.title, func.count(VasOrder.id).label("count") ) .join(VasOrder, VasOrder.product_id == VasProduct.id) .where(VasOrder.status.in_(["paid", "completed", "succeeded"])) .group_by(VasProduct.title) .order_by(desc("count")) .limit(5) ) product_rows = (await db.execute(product_stmt)).all() product_dist = [ {"name": title, "value": count} for title, count in product_rows ] # -------------------------------------------------- # 4. 最新动态 # -------------------------------------------------- activities = [] # 最近订单 order_stmt = ( select(VasOrder) .order_by(desc(VasOrder.created_at)) .limit(5) ) recent_orders = (await db.execute(order_stmt)).scalars().all() for o in recent_orders: symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency) amt_display = f"{symbol}{o.base_amount / 100}" activities.append({ "id": f"order_{o.id}", "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})", "time": o.created_at, "type": "order" if o.status == "pending" else "money" }) # 最近工单 ticket_stmt = ( select(VasTicket) .order_by(desc(VasTicket.created_at)) .limit(5) ) recent_tickets = (await db.execute(ticket_stmt)).scalars().all() for t in recent_tickets: reason_preview = ( t.reason[:20] + "..." if t.reason and len(t.reason) > 20 else t.reason ) activities.append({ "id": f"ticket_{t.id}", "text": f"新工单 #{t.id}: {reason_preview}", "time": t.created_at, "type": "ticket" }) # 排序 + 时间人性化 activities.sort(key=lambda x: x["time"], reverse=True) activities = activities[:10] now = datetime.now() for act in activities: dt = act["time"] diff = now - dt if diff.days > 0: act["time"] = f"{diff.days}天前" elif diff.seconds > 3600: act["time"] = f"{diff.seconds // 3600}小时前" elif diff.seconds > 60: act["time"] = f"{diff.seconds // 60}分钟前" else: act["time"] = "刚刚" # -------------------------------------------------- # 返回结果 # -------------------------------------------------- return { "stats": { "totalOrders": total_orders, "totalRevenue": total_revenue_cny, # CNY 分 "activeUsers": active_users, "pendingTickets": pending_tickets, "successRate": success_rate_str }, "revenue_trend": revenue_trend, "product_dist": product_dist, "recent_activities": activities }