| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- # app/services/statistics_service.py
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import func, desc, case, select
- from datetime import datetime, timedelta, date
- from typing import Dict, Any
- from app.models.order import VasOrder
- from app.models.ticket import VasTicket
- from app.models.vas_task import VasTask
- from app.models.user import VasUser
- from app.models.product import VasProduct
- # ======================
- # 汇率 & 货币符号
- # ======================
- EXCHANGE_RATES = {
- "CNY": 1.0,
- "USD": 7.25,
- "EUR": 7.65,
- "GBP": 9.10,
- "HKD": 0.92,
- "JPY": 0.048
- }
- CURRENCY_SYMBOLS = {
- "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
- }
- class StatisticsService:
- # ======================
- # 工具方法
- # ======================
- @staticmethod
- def _convert_to_cny(amount: any, currency: str) -> int:
- """金额(分) → CNY(分)"""
- if not amount:
- return 0
- rate = EXCHANGE_RATES.get(currency, 1.0)
- return int(float(amount) * rate)
- # ======================
- # 核心接口
- # ======================
- @staticmethod
- async def overview(db: AsyncSession) -> Dict[str, Any]:
- """
- 后台统计概览(Async 版)
- """
- # --------------------------------------------------
- # 1. 核心指标
- # --------------------------------------------------
- # 1.1 总营收(按币种分组)
- revenue_stmt = (
- select(
- VasOrder.base_currency,
- func.sum(VasOrder.base_amount)
- )
- .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
- .group_by(VasOrder.base_currency)
- )
- revenue_rows = (await db.execute(revenue_stmt)).all()
- total_revenue_cny = sum(
- StatisticsService._convert_to_cny(amount, currency)
- for currency, amount in revenue_rows
- )
- # 1.2 活跃订单数
- total_orders = (
- await db.scalar(
- select(func.count(VasOrder.id))
- .where(VasOrder.status != "closed")
- )
- ) or 0
- # 1.3 活跃用户数
- active_users = (
- await db.scalar(select(func.count(VasUser.id)))
- ) or 0
- # 1.4 待处理工单
- pending_tickets = (
- await db.scalar(
- select(func.count(VasTicket.id))
- .where(VasTicket.status.in_(["pending", "info_required"]))
- )
- ) or 0
- # 1.5 任务成功率
- task_stmt = select(
- func.count(VasTask.id).label("total"),
- func.sum(
- case((VasTask.status == "completed", 1), else_=0)
- ).label("success")
- )
- task_counts = (await db.execute(task_stmt)).first()
- success_rate_str = "0%"
- if task_counts and task_counts.total > 0:
- rate = (task_counts.success / task_counts.total) * 100
- success_rate_str = f"{rate:.1f}%"
- # --------------------------------------------------
- # 2. 最近 7 天营收趋势
- # --------------------------------------------------
- revenue_trend = []
- today = date.today()
- for i in range(6, -1, -1):
- target_date = today - timedelta(days=i)
- start_dt = datetime.combine(target_date, datetime.min.time())
- end_dt = datetime.combine(target_date, datetime.max.time())
- daily_stmt = (
- select(
- VasOrder.base_currency,
- func.sum(VasOrder.base_amount).label("amount"),
- func.count(VasOrder.id).label("orders")
- )
- .where(
- VasOrder.created_at >= start_dt,
- VasOrder.created_at <= end_dt,
- VasOrder.status.in_(["paid", "completed", "succeeded"])
- )
- .group_by(VasOrder.base_currency)
- )
- daily_rows = (await db.execute(daily_stmt)).all()
- daily_amount_cny = 0
- daily_order_count = 0
- for curr, amt, cnt in daily_rows:
- daily_amount_cny += StatisticsService._convert_to_cny(amt, curr)
- daily_order_count += cnt
- revenue_trend.append({
- "date": target_date.strftime("%m-%d"),
- "amount": daily_amount_cny / 100.0,
- "orders": daily_order_count
- })
- # --------------------------------------------------
- # 3. 商品销量分布(Top 5)
- # --------------------------------------------------
- product_stmt = (
- select(
- VasProduct.title,
- func.count(VasOrder.id).label("count")
- )
- .join(VasOrder, VasOrder.product_id == VasProduct.id)
- .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
- .group_by(VasProduct.title)
- .order_by(desc("count"))
- .limit(5)
- )
- product_rows = (await db.execute(product_stmt)).all()
- product_dist = [
- {"name": title, "value": count}
- for title, count in product_rows
- ]
- # --------------------------------------------------
- # 4. 最新动态
- # --------------------------------------------------
- activities = []
- # 最近订单
- order_stmt = (
- select(VasOrder)
- .order_by(desc(VasOrder.created_at))
- .limit(5)
- )
- recent_orders = (await db.execute(order_stmt)).scalars().all()
- for o in recent_orders:
- symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
- amt_display = f"{symbol}{o.base_amount / 100}"
- activities.append({
- "id": f"order_{o.id}",
- "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})",
- "time": o.created_at,
- "type": "order" if o.status == "pending" else "money"
- })
- # 最近工单
- ticket_stmt = (
- select(VasTicket)
- .order_by(desc(VasTicket.created_at))
- .limit(5)
- )
- recent_tickets = (await db.execute(ticket_stmt)).scalars().all()
- for t in recent_tickets:
- reason_preview = (
- t.reason[:20] + "..."
- if t.reason and len(t.reason) > 20
- else t.reason
- )
- activities.append({
- "id": f"ticket_{t.id}",
- "text": f"新工单 #{t.id}: {reason_preview}",
- "time": t.created_at,
- "type": "ticket"
- })
- # 排序 + 时间人性化
- activities.sort(key=lambda x: x["time"], reverse=True)
- activities = activities[:10]
- now = datetime.now()
- for act in activities:
- dt = act["time"]
- diff = now - dt
- if diff.days > 0:
- act["time"] = f"{diff.days}天前"
- elif diff.seconds > 3600:
- act["time"] = f"{diff.seconds // 3600}小时前"
- elif diff.seconds > 60:
- act["time"] = f"{diff.seconds // 60}分钟前"
- else:
- act["time"] = "刚刚"
- # --------------------------------------------------
- # 返回结果
- # --------------------------------------------------
- return {
- "stats": {
- "totalOrders": total_orders,
- "totalRevenue": total_revenue_cny, # CNY 分
- "activeUsers": active_users,
- "pendingTickets": pending_tickets,
- "successRate": success_rate_str
- },
- "revenue_trend": revenue_trend,
- "product_dist": product_dist,
- "recent_activities": activities
- }
|