statistics_service.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # app/services/statistics_service.py
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from sqlalchemy import func, desc, case, select
  4. from datetime import datetime, timedelta, date
  5. from typing import Dict, Any
  6. from app.models.order import VasOrder
  7. from app.models.ticket import VasTicket
  8. from app.models.vas_task import VasTask
  9. from app.models.user import VasUser
  10. from app.models.product import VasProduct
  11. # ======================
  12. # 汇率 & 货币符号
  13. # ======================
  14. EXCHANGE_RATES = {
  15. "CNY": 1.0,
  16. "USD": 7.25,
  17. "EUR": 7.65,
  18. "GBP": 9.10,
  19. "HKD": 0.92,
  20. "JPY": 0.048
  21. }
  22. CURRENCY_SYMBOLS = {
  23. "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
  24. }
  25. class StatisticsService:
  26. # ======================
  27. # 工具方法
  28. # ======================
  29. @staticmethod
  30. def _convert_to_cny(amount: any, currency: str) -> int:
  31. """金额(分) → CNY(分)"""
  32. if not amount:
  33. return 0
  34. rate = EXCHANGE_RATES.get(currency, 1.0)
  35. return int(float(amount) * rate)
  36. # ======================
  37. # 核心接口
  38. # ======================
  39. @staticmethod
  40. async def overview(db: AsyncSession) -> Dict[str, Any]:
  41. """
  42. 后台统计概览(Async 版)
  43. """
  44. # --------------------------------------------------
  45. # 1. 核心指标
  46. # --------------------------------------------------
  47. # 1.1 总营收(按币种分组)
  48. revenue_stmt = (
  49. select(
  50. VasOrder.base_currency,
  51. func.sum(VasOrder.base_amount)
  52. )
  53. .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
  54. .group_by(VasOrder.base_currency)
  55. )
  56. revenue_rows = (await db.execute(revenue_stmt)).all()
  57. total_revenue_cny = sum(
  58. StatisticsService._convert_to_cny(amount, currency)
  59. for currency, amount in revenue_rows
  60. )
  61. # 1.2 活跃订单数
  62. total_orders = (
  63. await db.scalar(
  64. select(func.count(VasOrder.id))
  65. .where(VasOrder.status != "closed")
  66. )
  67. ) or 0
  68. # 1.3 活跃用户数
  69. active_users = (
  70. await db.scalar(select(func.count(VasUser.id)))
  71. ) or 0
  72. # 1.4 待处理工单
  73. pending_tickets = (
  74. await db.scalar(
  75. select(func.count(VasTicket.id))
  76. .where(VasTicket.status.in_(["pending", "info_required"]))
  77. )
  78. ) or 0
  79. # 1.5 任务成功率
  80. task_stmt = select(
  81. func.count(VasTask.id).label("total"),
  82. func.sum(
  83. case((VasTask.status == "completed", 1), else_=0)
  84. ).label("success")
  85. )
  86. task_counts = (await db.execute(task_stmt)).first()
  87. success_rate_str = "0%"
  88. if task_counts and task_counts.total > 0:
  89. rate = (task_counts.success / task_counts.total) * 100
  90. success_rate_str = f"{rate:.1f}%"
  91. # --------------------------------------------------
  92. # 2. 最近 7 天营收趋势
  93. # --------------------------------------------------
  94. revenue_trend = []
  95. today = date.today()
  96. for i in range(6, -1, -1):
  97. target_date = today - timedelta(days=i)
  98. start_dt = datetime.combine(target_date, datetime.min.time())
  99. end_dt = datetime.combine(target_date, datetime.max.time())
  100. daily_stmt = (
  101. select(
  102. VasOrder.base_currency,
  103. func.sum(VasOrder.base_amount).label("amount"),
  104. func.count(VasOrder.id).label("orders")
  105. )
  106. .where(
  107. VasOrder.created_at >= start_dt,
  108. VasOrder.created_at <= end_dt,
  109. VasOrder.status.in_(["paid", "completed", "succeeded"])
  110. )
  111. .group_by(VasOrder.base_currency)
  112. )
  113. daily_rows = (await db.execute(daily_stmt)).all()
  114. daily_amount_cny = 0
  115. daily_order_count = 0
  116. for curr, amt, cnt in daily_rows:
  117. daily_amount_cny += StatisticsService._convert_to_cny(amt, curr)
  118. daily_order_count += cnt
  119. revenue_trend.append({
  120. "date": target_date.strftime("%m-%d"),
  121. "amount": daily_amount_cny / 100.0,
  122. "orders": daily_order_count
  123. })
  124. # --------------------------------------------------
  125. # 3. 商品销量分布(Top 5)
  126. # --------------------------------------------------
  127. product_stmt = (
  128. select(
  129. VasProduct.title,
  130. func.count(VasOrder.id).label("count")
  131. )
  132. .join(VasOrder, VasOrder.product_id == VasProduct.id)
  133. .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
  134. .group_by(VasProduct.title)
  135. .order_by(desc("count"))
  136. .limit(5)
  137. )
  138. product_rows = (await db.execute(product_stmt)).all()
  139. product_dist = [
  140. {"name": title, "value": count}
  141. for title, count in product_rows
  142. ]
  143. # --------------------------------------------------
  144. # 4. 最新动态
  145. # --------------------------------------------------
  146. activities = []
  147. # 最近订单
  148. order_stmt = (
  149. select(VasOrder)
  150. .order_by(desc(VasOrder.created_at))
  151. .limit(5)
  152. )
  153. recent_orders = (await db.execute(order_stmt)).scalars().all()
  154. for o in recent_orders:
  155. symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
  156. amt_display = f"{symbol}{o.base_amount / 100}"
  157. activities.append({
  158. "id": f"order_{o.id}",
  159. "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})",
  160. "time": o.created_at,
  161. "type": "order" if o.status == "pending" else "money"
  162. })
  163. # 最近工单
  164. ticket_stmt = (
  165. select(VasTicket)
  166. .order_by(desc(VasTicket.created_at))
  167. .limit(5)
  168. )
  169. recent_tickets = (await db.execute(ticket_stmt)).scalars().all()
  170. for t in recent_tickets:
  171. reason_preview = (
  172. t.reason[:20] + "..."
  173. if t.reason and len(t.reason) > 20
  174. else t.reason
  175. )
  176. activities.append({
  177. "id": f"ticket_{t.id}",
  178. "text": f"新工单 #{t.id}: {reason_preview}",
  179. "time": t.created_at,
  180. "type": "ticket"
  181. })
  182. # 排序 + 时间人性化
  183. activities.sort(key=lambda x: x["time"], reverse=True)
  184. activities = activities[:10]
  185. now = datetime.now()
  186. for act in activities:
  187. dt = act["time"]
  188. diff = now - dt
  189. if diff.days > 0:
  190. act["time"] = f"{diff.days}天前"
  191. elif diff.seconds > 3600:
  192. act["time"] = f"{diff.seconds // 3600}小时前"
  193. elif diff.seconds > 60:
  194. act["time"] = f"{diff.seconds // 60}分钟前"
  195. else:
  196. act["time"] = "刚刚"
  197. # --------------------------------------------------
  198. # 返回结果
  199. # --------------------------------------------------
  200. return {
  201. "stats": {
  202. "totalOrders": total_orders,
  203. "totalRevenue": total_revenue_cny, # CNY 分
  204. "activeUsers": active_users,
  205. "pendingTickets": pending_tickets,
  206. "successRate": success_rate_str
  207. },
  208. "revenue_trend": revenue_trend,
  209. "product_dist": product_dist,
  210. "recent_activities": activities
  211. }