| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- # app/services/statistics_service.py
- import json
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import func, desc, case, select
- from datetime import datetime, timedelta, date
- from typing import Dict, Any, Tuple
- from app.models.order import VasOrder
- from app.models.ticket import VasTicket
- from app.models.vas_task import VasTask
- from app.models.user import VasUser
- from app.models.product import VasProduct
- from app.models.configuration import Configuration
- # ======================
- # 货币符号 (仅用于展示文本,无需存库)
- # ======================
- CURRENCY_SYMBOLS = {
- "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
- }
- class StatisticsService:
- # ======================
- # 辅助方法:汇率与时间
- # ======================
- @staticmethod
- async def _get_exchange_rates(db: AsyncSession) -> Dict[str, float]:
- """
- 从数据库获取动态汇率配置
- 返回格式示例: {'CNY': 1.0, 'USD': 7.25, ...}
- """
- stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
- config_obj = (await db.execute(stmt)).scalar_one_or_none()
-
- # 默认兜底汇率
- fallback_rates = {"CNY": 1.0}
- if not config_obj:
- return fallback_rates
-
- try:
- val = config_obj.config_value
- data = {}
- if isinstance(val, str):
- data = json.loads(val)
- elif isinstance(val, dict):
- data = val
-
- return data.get("rates", fallback_rates)
- except Exception:
- return fallback_rates
- @staticmethod
- def _convert_to_cny(amount: Any, currency: str, rates: Dict[str, float]) -> int:
- """
- 金额(分) → CNY(分)
- """
- if not amount:
- return 0
- rate = float(rates.get(currency, 1.0))
- return int(float(amount) * rate)
- @staticmethod
- def _get_month_ranges() -> Tuple[datetime, datetime, datetime, datetime]:
- """
- 获取时间范围用于环比计算
- 返回: (本月开始, 本月结束, 上月开始, 上月结束)
- """
- now = datetime.now()
-
- # 本月范围
- this_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
- this_month_end = now # 截止到当前时间
- # 上月范围
- # 上月结束 = 本月开始 - 1微秒
- last_month_end_dt = this_month_start - timedelta(microseconds=1)
- # 上月开始
- last_month_start = last_month_end_dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
-
- # 为了公平比较(如果需要),可以将上月结束时间也限制在和本月同样的天数进度
- # 但通常简易报表直接对比 上月全月总额 vs 本月至今总额,或者对比 上月至今
- # 这里采用简单的 上月全月 vs 本月至今
-
- return this_month_start, this_month_end, last_month_start, last_month_end_dt
- @staticmethod
- def _calculate_growth(current: float, previous: float) -> float:
- """计算增长率百分比"""
- if previous == 0:
- return 100.0 if current > 0 else 0.0
- return ((current - previous) / previous) * 100.0
- # ======================
- # 核心接口
- # ======================
- @staticmethod
- async def overview(db: AsyncSession) -> Dict[str, Any]:
- """
- 后台统计概览(Async 版)
- """
- # 0. 准备基础数据
- current_rates = await StatisticsService._get_exchange_rates(db)
- tm_start, tm_end, lm_start, lm_end = StatisticsService._get_month_ranges()
- # --------------------------------------------------
- # 1. 营收统计 (本月 vs 上月) - 用于计算 Trend
- # --------------------------------------------------
-
- async def calculate_revenue(start_dt, end_dt):
- stmt = (
- select(VasOrder.base_currency, func.sum(VasOrder.base_amount))
- .where(
- VasOrder.created_at >= start_dt,
- VasOrder.created_at <= end_dt,
- VasOrder.status.in_(["paid", "completed", "succeeded"])
- )
- .group_by(VasOrder.base_currency)
- )
- rows = (await db.execute(stmt)).all()
- total_cny = sum(
- StatisticsService._convert_to_cny(amt, curr, current_rates)
- for curr, amt in rows
- )
- return total_cny
- revenue_this_month = await calculate_revenue(tm_start, tm_end)
- revenue_last_month = await calculate_revenue(lm_start, lm_end)
-
- revenue_trend = StatisticsService._calculate_growth(revenue_this_month, revenue_last_month)
- # --------------------------------------------------
- # 2. 订单量统计 (本月 vs 上月) - 用于计算 Trend
- # --------------------------------------------------
- async def calculate_orders(start_dt, end_dt):
- count = await db.scalar(
- select(func.count(VasOrder.id))
- .where(
- VasOrder.created_at >= start_dt,
- VasOrder.created_at <= end_dt,
- VasOrder.status.in_(["paid", "completed", "succeeded"]) # 仅统计有效订单
- )
- )
- return count or 0
- orders_this_month = await calculate_orders(tm_start, tm_end)
- orders_last_month = await calculate_orders(lm_start, lm_end)
- orders_trend = StatisticsService._calculate_growth(orders_this_month, orders_last_month)
- # --------------------------------------------------
- # 3. 其他静态指标
- # --------------------------------------------------
- # 活跃用户数 (总累计)
- active_users = (
- await db.scalar(select(func.count(VasUser.id)))
- ) or 0
- # 待处理工单
- pending_tickets = (
- await db.scalar(
- select(func.count(VasTicket.id))
- .where(VasTicket.status.in_(["pending", "info_required"]))
- )
- ) or 0
- # 任务成功率
- task_stmt = select(
- func.count(VasTask.id).label("total"),
- func.sum(
- case((VasTask.status == "completed", 1), else_=0)
- ).label("success")
- )
- task_counts = (await db.execute(task_stmt)).first()
- success_rate_str = "0%"
- if task_counts and task_counts.total > 0:
- rate = (task_counts.success / task_counts.total) * 100
- success_rate_str = f"{rate:.1f}%"
- # --------------------------------------------------
- # 4. 图表数据:最近 7 天营收趋势
- # --------------------------------------------------
- revenue_trend_chart = []
- today = date.today()
- for i in range(6, -1, -1):
- target_date = today - timedelta(days=i)
- start_dt = datetime.combine(target_date, datetime.min.time())
- end_dt = datetime.combine(target_date, datetime.max.time())
- daily_stmt = (
- select(
- VasOrder.base_currency,
- func.sum(VasOrder.base_amount).label("amount"),
- func.count(VasOrder.id).label("orders")
- )
- .where(
- VasOrder.created_at >= start_dt,
- VasOrder.created_at <= end_dt,
- VasOrder.status.in_(["paid", "completed", "succeeded"])
- )
- .group_by(VasOrder.base_currency)
- )
- daily_rows = (await db.execute(daily_stmt)).all()
- daily_amount_cny = 0
- daily_order_count = 0
- for curr, amt, cnt in daily_rows:
- daily_amount_cny += StatisticsService._convert_to_cny(amt, curr, current_rates)
- daily_order_count += cnt
- revenue_trend_chart.append({
- "date": target_date.strftime("%m-%d"),
- "amount": daily_amount_cny / 100.0, # 转为元
- "orders": daily_order_count
- })
- # --------------------------------------------------
- # 5. 商品销量分布(Top 5)
- # --------------------------------------------------
- product_stmt = (
- select(
- VasProduct.title,
- func.count(VasOrder.id).label("count")
- )
- .join(VasOrder, VasOrder.product_id == VasProduct.id)
- .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
- .group_by(VasProduct.title)
- .order_by(desc("count"))
- .limit(5)
- )
- product_rows = (await db.execute(product_stmt)).all()
- product_dist = [
- {"name": title, "value": count}
- for title, count in product_rows
- ]
- # --------------------------------------------------
- # 6. 最新动态 (混合订单和工单)
- # --------------------------------------------------
- activities = []
- # 最近订单
- order_stmt = (
- select(VasOrder)
- .order_by(desc(VasOrder.created_at))
- .limit(5)
- )
- recent_orders = (await db.execute(order_stmt)).scalars().all()
- for o in recent_orders:
- # 动态仅作展示,这里使用原始币种即可,不需要转CNY
- symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
- amt_display = f"{symbol}{o.base_amount / 100}"
- activities.append({
- "id": f"order_{o.id}",
- "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})",
- "time": o.created_at,
- "type": "order" if o.status == "pending" else "money"
- })
- # 最近工单
- ticket_stmt = (
- select(VasTicket)
- .order_by(desc(VasTicket.created_at))
- .limit(5)
- )
- recent_tickets = (await db.execute(ticket_stmt)).scalars().all()
- for t in recent_tickets:
- reason_preview = (
- t.reason[:20] + "..."
- if t.reason and len(t.reason) > 20
- else t.reason
- )
- activities.append({
- "id": f"ticket_{t.id}",
- "text": f"新工单 #{t.id}: {reason_preview}",
- "time": t.created_at,
- "type": "ticket"
- })
- # 按时间倒序排序
- activities.sort(key=lambda x: x["time"], reverse=True)
- activities = activities[:10]
- # 格式化时间显示
- now_dt = datetime.now()
- for act in activities:
- dt = act["time"]
- diff = now_dt - dt
- if diff.days > 0:
- act["time"] = f"{diff.days}天前"
- elif diff.seconds > 3600:
- act["time"] = f"{diff.seconds // 3600}小时前"
- elif diff.seconds > 60:
- act["time"] = f"{diff.seconds // 60}分钟前"
- else:
- act["time"] = "刚刚"
- # --------------------------------------------------
- # 7. 组装返回结果
- # --------------------------------------------------
- return {
- "stats": {
- "totalOrders": orders_this_month, # 仅显示本月,为了匹配趋势语境
- "totalOrdersTrend": orders_trend, # 新增:订单增长率
-
- "totalRevenue": revenue_this_month, # 仅显示本月营收 (分)
- "totalRevenueTrend": revenue_trend, # 新增:营收增长率
-
- "activeUsers": active_users,
- "pendingTickets": pending_tickets,
- "successRate": success_rate_str
- },
- "revenue_trend": revenue_trend_chart, # 图表数据
- "product_dist": product_dist, # 饼图数据
- "recent_activities": activities # 动态列表
- }
|