# app/services/statistics_service.py import json from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import func, desc, case, select from datetime import datetime, timedelta, date from typing import Dict, Any, Tuple from app.models.order import VasOrder from app.models.ticket import VasTicket from app.models.vas_task import VasTask from app.models.user import VasUser from app.models.product import VasProduct from app.models.configuration import Configuration # ====================== # 货币符号 (仅用于展示文本,无需存库) # ====================== CURRENCY_SYMBOLS = { "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥" } class StatisticsService: # ====================== # 辅助方法:汇率与时间 # ====================== @staticmethod async def _get_exchange_rates(db: AsyncSession) -> Dict[str, float]: """ 从数据库获取动态汇率配置 返回格式示例: {'CNY': 1.0, 'USD': 7.25, ...} """ stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES") config_obj = (await db.execute(stmt)).scalar_one_or_none() # 默认兜底汇率 fallback_rates = {"CNY": 1.0} if not config_obj: return fallback_rates try: val = config_obj.config_value data = {} if isinstance(val, str): data = json.loads(val) elif isinstance(val, dict): data = val return data.get("rates", fallback_rates) except Exception: return fallback_rates @staticmethod def _convert_to_cny(amount: Any, currency: str, rates: Dict[str, float]) -> int: """ 金额(分) → CNY(分) """ if not amount: return 0 rate = float(rates.get(currency, 1.0)) return int(float(amount) * rate) @staticmethod def _get_month_ranges() -> Tuple[datetime, datetime, datetime, datetime]: """ 获取时间范围用于环比计算 返回: (本月开始, 本月结束, 上月开始, 上月结束) """ now = datetime.now() # 本月范围 this_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) this_month_end = now # 截止到当前时间 # 上月范围 # 上月结束 = 本月开始 - 1微秒 last_month_end_dt = this_month_start - timedelta(microseconds=1) # 上月开始 last_month_start = last_month_end_dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # 为了公平比较(如果需要),可以将上月结束时间也限制在和本月同样的天数进度 # 但通常简易报表直接对比 上月全月总额 vs 本月至今总额,或者对比 上月至今 # 这里采用简单的 上月全月 vs 本月至今 return this_month_start, this_month_end, last_month_start, last_month_end_dt @staticmethod def _calculate_growth(current: float, previous: float) -> float: """计算增长率百分比""" if previous == 0: return 100.0 if current > 0 else 0.0 return ((current - previous) / previous) * 100.0 # ====================== # 核心接口 # ====================== @staticmethod async def overview(db: AsyncSession) -> Dict[str, Any]: """ 后台统计概览(Async 版) """ # 0. 准备基础数据 current_rates = await StatisticsService._get_exchange_rates(db) tm_start, tm_end, lm_start, lm_end = StatisticsService._get_month_ranges() # -------------------------------------------------- # 1. 营收统计 (本月 vs 上月) - 用于计算 Trend # -------------------------------------------------- async def calculate_revenue(start_dt, end_dt): stmt = ( select(VasOrder.base_currency, func.sum(VasOrder.base_amount)) .where( VasOrder.created_at >= start_dt, VasOrder.created_at <= end_dt, VasOrder.status.in_(["paid", "completed", "succeeded"]) ) .group_by(VasOrder.base_currency) ) rows = (await db.execute(stmt)).all() total_cny = sum( StatisticsService._convert_to_cny(amt, curr, current_rates) for curr, amt in rows ) return total_cny revenue_this_month = await calculate_revenue(tm_start, tm_end) revenue_last_month = await calculate_revenue(lm_start, lm_end) revenue_trend = StatisticsService._calculate_growth(revenue_this_month, revenue_last_month) # -------------------------------------------------- # 2. 订单量统计 (本月 vs 上月) - 用于计算 Trend # -------------------------------------------------- async def calculate_orders(start_dt, end_dt): count = await db.scalar( select(func.count(VasOrder.id)) .where( VasOrder.created_at >= start_dt, VasOrder.created_at <= end_dt, VasOrder.status.in_(["paid", "completed", "succeeded"]) # 仅统计有效订单 ) ) return count or 0 orders_this_month = await calculate_orders(tm_start, tm_end) orders_last_month = await calculate_orders(lm_start, lm_end) orders_trend = StatisticsService._calculate_growth(orders_this_month, orders_last_month) # -------------------------------------------------- # 3. 其他静态指标 # -------------------------------------------------- # 活跃用户数 (总累计) active_users = ( await db.scalar(select(func.count(VasUser.id))) ) or 0 # 待处理工单 pending_tickets = ( await db.scalar( select(func.count(VasTicket.id)) .where(VasTicket.status.in_(["pending", "info_required"])) ) ) or 0 # 任务成功率 task_stmt = select( func.count(VasTask.id).label("total"), func.sum( case((VasTask.status == "completed", 1), else_=0) ).label("success") ) task_counts = (await db.execute(task_stmt)).first() success_rate_str = "0%" if task_counts and task_counts.total > 0: rate = (task_counts.success / task_counts.total) * 100 success_rate_str = f"{rate:.1f}%" # -------------------------------------------------- # 4. 图表数据:最近 7 天营收趋势 # -------------------------------------------------- revenue_trend_chart = [] today = date.today() for i in range(6, -1, -1): target_date = today - timedelta(days=i) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date, datetime.max.time()) daily_stmt = ( select( VasOrder.base_currency, func.sum(VasOrder.base_amount).label("amount"), func.count(VasOrder.id).label("orders") ) .where( VasOrder.created_at >= start_dt, VasOrder.created_at <= end_dt, VasOrder.status.in_(["paid", "completed", "succeeded"]) ) .group_by(VasOrder.base_currency) ) daily_rows = (await db.execute(daily_stmt)).all() daily_amount_cny = 0 daily_order_count = 0 for curr, amt, cnt in daily_rows: daily_amount_cny += StatisticsService._convert_to_cny(amt, curr, current_rates) daily_order_count += cnt revenue_trend_chart.append({ "date": target_date.strftime("%m-%d"), "amount": daily_amount_cny / 100.0, # 转为元 "orders": daily_order_count }) # -------------------------------------------------- # 5. 商品销量分布(Top 5) # -------------------------------------------------- product_stmt = ( select( VasProduct.title, func.count(VasOrder.id).label("count") ) .join(VasOrder, VasOrder.product_id == VasProduct.id) .where(VasOrder.status.in_(["paid", "completed", "succeeded"])) .group_by(VasProduct.title) .order_by(desc("count")) .limit(5) ) product_rows = (await db.execute(product_stmt)).all() product_dist = [ {"name": title, "value": count} for title, count in product_rows ] # -------------------------------------------------- # 6. 最新动态 (混合订单和工单) # -------------------------------------------------- activities = [] # 最近订单 order_stmt = ( select(VasOrder) .order_by(desc(VasOrder.created_at)) .limit(5) ) recent_orders = (await db.execute(order_stmt)).scalars().all() for o in recent_orders: # 动态仅作展示,这里使用原始币种即可,不需要转CNY symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency) amt_display = f"{symbol}{o.base_amount / 100}" activities.append({ "id": f"order_{o.id}", "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})", "time": o.created_at, "type": "order" if o.status == "pending" else "money" }) # 最近工单 ticket_stmt = ( select(VasTicket) .order_by(desc(VasTicket.created_at)) .limit(5) ) recent_tickets = (await db.execute(ticket_stmt)).scalars().all() for t in recent_tickets: reason_preview = ( t.reason[:20] + "..." if t.reason and len(t.reason) > 20 else t.reason ) activities.append({ "id": f"ticket_{t.id}", "text": f"新工单 #{t.id}: {reason_preview}", "time": t.created_at, "type": "ticket" }) # 按时间倒序排序 activities.sort(key=lambda x: x["time"], reverse=True) activities = activities[:10] # 格式化时间显示 now_dt = datetime.now() for act in activities: dt = act["time"] diff = now_dt - dt if diff.days > 0: act["time"] = f"{diff.days}天前" elif diff.seconds > 3600: act["time"] = f"{diff.seconds // 3600}小时前" elif diff.seconds > 60: act["time"] = f"{diff.seconds // 60}分钟前" else: act["time"] = "刚刚" active_task_stmt = ( select( VasTask.routing_key, func.count(VasTask.id).label("count") ) .where( VasTask.status.in_(["pending", "grabbed", "running"]) ) .group_by(VasTask.routing_key) .order_by(desc("count")) # 按数量倒序排列 ) active_task_rows = (await db.execute(active_task_stmt)).all() # 组装为 List[ActiveTaskGroupItem] 格式 active_tasks_dist_data = [ {"routing_key": r_key, "count": count} for r_key, count in active_task_rows ] # -------------------------------------------------- # 7. 组装返回结果 # -------------------------------------------------- return { "stats": { "totalOrders": orders_this_month, # 仅显示本月,为了匹配趋势语境 "totalOrdersTrend": orders_trend, # 新增:订单增长率 "totalRevenue": revenue_this_month, # 仅显示本月营收 (分) "totalRevenueTrend": revenue_trend, # 新增:营收增长率 "activeUsers": active_users, "pendingTickets": pending_tickets, "successRate": success_rate_str }, "revenue_trend": revenue_trend_chart, # 图表数据 "product_dist": product_dist, # 饼图数据 "recent_activities": activities, # 动态列表 "active_tasks_dist": active_tasks_dist_data }