statistics_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. # app/services/statistics_service.py
  2. import json
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy import func, desc, case, select
  5. from datetime import datetime, timedelta, date
  6. from typing import Dict, Any, Tuple
  7. from app.models.order import VasOrder
  8. from app.models.ticket import VasTicket
  9. from app.models.vas_task import VasTask
  10. from app.models.user import VasUser
  11. from app.models.product import VasProduct
  12. from app.models.configuration import Configuration
  13. # ======================
  14. # 货币符号 (仅用于展示文本,无需存库)
  15. # ======================
  16. CURRENCY_SYMBOLS = {
  17. "CNY": "¥", "USD": "$", "EUR": "€", "GBP": "£", "HKD": "HK$", "JPY": "¥"
  18. }
  19. class StatisticsService:
  20. # ======================
  21. # 辅助方法:汇率与时间
  22. # ======================
  23. @staticmethod
  24. async def _get_exchange_rates(db: AsyncSession) -> Dict[str, float]:
  25. """
  26. 从数据库获取动态汇率配置
  27. 返回格式示例: {'CNY': 1.0, 'USD': 7.25, ...}
  28. """
  29. stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
  30. config_obj = (await db.execute(stmt)).scalar_one_or_none()
  31. # 默认兜底汇率
  32. fallback_rates = {"CNY": 1.0}
  33. if not config_obj:
  34. return fallback_rates
  35. try:
  36. val = config_obj.config_value
  37. data = {}
  38. if isinstance(val, str):
  39. data = json.loads(val)
  40. elif isinstance(val, dict):
  41. data = val
  42. return data.get("rates", fallback_rates)
  43. except Exception:
  44. return fallback_rates
  45. @staticmethod
  46. def _convert_to_cny(amount: Any, currency: str, rates: Dict[str, float]) -> int:
  47. """
  48. 金额(分) → CNY(分)
  49. """
  50. if not amount:
  51. return 0
  52. rate = float(rates.get(currency, 1.0))
  53. return int(float(amount) * rate)
  54. @staticmethod
  55. def _get_month_ranges() -> Tuple[datetime, datetime, datetime, datetime]:
  56. """
  57. 获取时间范围用于环比计算
  58. 返回: (本月开始, 本月结束, 上月开始, 上月结束)
  59. """
  60. now = datetime.now()
  61. # 本月范围
  62. this_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
  63. this_month_end = now # 截止到当前时间
  64. # 上月范围
  65. # 上月结束 = 本月开始 - 1微秒
  66. last_month_end_dt = this_month_start - timedelta(microseconds=1)
  67. # 上月开始
  68. last_month_start = last_month_end_dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
  69. # 为了公平比较(如果需要),可以将上月结束时间也限制在和本月同样的天数进度
  70. # 但通常简易报表直接对比 上月全月总额 vs 本月至今总额,或者对比 上月至今
  71. # 这里采用简单的 上月全月 vs 本月至今
  72. return this_month_start, this_month_end, last_month_start, last_month_end_dt
  73. @staticmethod
  74. def _calculate_growth(current: float, previous: float) -> float:
  75. """计算增长率百分比"""
  76. if previous == 0:
  77. return 100.0 if current > 0 else 0.0
  78. return ((current - previous) / previous) * 100.0
  79. # ======================
  80. # 核心接口
  81. # ======================
  82. @staticmethod
  83. async def overview(db: AsyncSession) -> Dict[str, Any]:
  84. """
  85. 后台统计概览(Async 版)
  86. """
  87. # 0. 准备基础数据
  88. current_rates = await StatisticsService._get_exchange_rates(db)
  89. tm_start, tm_end, lm_start, lm_end = StatisticsService._get_month_ranges()
  90. # --------------------------------------------------
  91. # 1. 营收统计 (本月 vs 上月) - 用于计算 Trend
  92. # --------------------------------------------------
  93. async def calculate_revenue(start_dt, end_dt):
  94. stmt = (
  95. select(VasOrder.base_currency, func.sum(VasOrder.base_amount))
  96. .where(
  97. VasOrder.created_at >= start_dt,
  98. VasOrder.created_at <= end_dt,
  99. VasOrder.status.in_(["paid", "completed", "succeeded"])
  100. )
  101. .group_by(VasOrder.base_currency)
  102. )
  103. rows = (await db.execute(stmt)).all()
  104. total_cny = sum(
  105. StatisticsService._convert_to_cny(amt, curr, current_rates)
  106. for curr, amt in rows
  107. )
  108. return total_cny
  109. revenue_this_month = await calculate_revenue(tm_start, tm_end)
  110. revenue_last_month = await calculate_revenue(lm_start, lm_end)
  111. revenue_trend = StatisticsService._calculate_growth(revenue_this_month, revenue_last_month)
  112. # --------------------------------------------------
  113. # 2. 订单量统计 (本月 vs 上月) - 用于计算 Trend
  114. # --------------------------------------------------
  115. async def calculate_orders(start_dt, end_dt):
  116. count = await db.scalar(
  117. select(func.count(VasOrder.id))
  118. .where(
  119. VasOrder.created_at >= start_dt,
  120. VasOrder.created_at <= end_dt,
  121. VasOrder.status.in_(["paid", "completed", "succeeded"]) # 仅统计有效订单
  122. )
  123. )
  124. return count or 0
  125. orders_this_month = await calculate_orders(tm_start, tm_end)
  126. orders_last_month = await calculate_orders(lm_start, lm_end)
  127. orders_trend = StatisticsService._calculate_growth(orders_this_month, orders_last_month)
  128. # --------------------------------------------------
  129. # 3. 其他静态指标
  130. # --------------------------------------------------
  131. # 活跃用户数 (总累计)
  132. active_users = (
  133. await db.scalar(select(func.count(VasUser.id)))
  134. ) or 0
  135. # 待处理工单
  136. pending_tickets = (
  137. await db.scalar(
  138. select(func.count(VasTicket.id))
  139. .where(VasTicket.status.in_(["pending", "info_required"]))
  140. )
  141. ) or 0
  142. # 任务成功率
  143. task_stmt = select(
  144. func.count(VasTask.id).label("total"),
  145. func.sum(
  146. case((VasTask.status == "completed", 1), else_=0)
  147. ).label("success")
  148. )
  149. task_counts = (await db.execute(task_stmt)).first()
  150. success_rate_str = "0%"
  151. if task_counts and task_counts.total > 0:
  152. rate = (task_counts.success / task_counts.total) * 100
  153. success_rate_str = f"{rate:.1f}%"
  154. # --------------------------------------------------
  155. # 4. 图表数据:最近 7 天营收趋势
  156. # --------------------------------------------------
  157. revenue_trend_chart = []
  158. today = date.today()
  159. for i in range(6, -1, -1):
  160. target_date = today - timedelta(days=i)
  161. start_dt = datetime.combine(target_date, datetime.min.time())
  162. end_dt = datetime.combine(target_date, datetime.max.time())
  163. daily_stmt = (
  164. select(
  165. VasOrder.base_currency,
  166. func.sum(VasOrder.base_amount).label("amount"),
  167. func.count(VasOrder.id).label("orders")
  168. )
  169. .where(
  170. VasOrder.created_at >= start_dt,
  171. VasOrder.created_at <= end_dt,
  172. VasOrder.status.in_(["paid", "completed", "succeeded"])
  173. )
  174. .group_by(VasOrder.base_currency)
  175. )
  176. daily_rows = (await db.execute(daily_stmt)).all()
  177. daily_amount_cny = 0
  178. daily_order_count = 0
  179. for curr, amt, cnt in daily_rows:
  180. daily_amount_cny += StatisticsService._convert_to_cny(amt, curr, current_rates)
  181. daily_order_count += cnt
  182. revenue_trend_chart.append({
  183. "date": target_date.strftime("%m-%d"),
  184. "amount": daily_amount_cny / 100.0, # 转为元
  185. "orders": daily_order_count
  186. })
  187. # --------------------------------------------------
  188. # 5. 商品销量分布(Top 5)
  189. # --------------------------------------------------
  190. product_stmt = (
  191. select(
  192. VasProduct.title,
  193. func.count(VasOrder.id).label("count")
  194. )
  195. .join(VasOrder, VasOrder.product_id == VasProduct.id)
  196. .where(VasOrder.status.in_(["paid", "completed", "succeeded"]))
  197. .group_by(VasProduct.title)
  198. .order_by(desc("count"))
  199. .limit(5)
  200. )
  201. product_rows = (await db.execute(product_stmt)).all()
  202. product_dist = [
  203. {"name": title, "value": count}
  204. for title, count in product_rows
  205. ]
  206. # --------------------------------------------------
  207. # 6. 最新动态 (混合订单和工单)
  208. # --------------------------------------------------
  209. activities = []
  210. # 最近订单
  211. order_stmt = (
  212. select(VasOrder)
  213. .order_by(desc(VasOrder.created_at))
  214. .limit(5)
  215. )
  216. recent_orders = (await db.execute(order_stmt)).scalars().all()
  217. for o in recent_orders:
  218. # 动态仅作展示,这里使用原始币种即可,不需要转CNY
  219. symbol = CURRENCY_SYMBOLS.get(o.base_currency, o.base_currency)
  220. amt_display = f"{symbol}{o.base_amount / 100}"
  221. activities.append({
  222. "id": f"order_{o.id}",
  223. "text": f"用户下单: {o.product_name or '未知商品'} ({amt_display})",
  224. "time": o.created_at,
  225. "type": "order" if o.status == "pending" else "money"
  226. })
  227. # 最近工单
  228. ticket_stmt = (
  229. select(VasTicket)
  230. .order_by(desc(VasTicket.created_at))
  231. .limit(5)
  232. )
  233. recent_tickets = (await db.execute(ticket_stmt)).scalars().all()
  234. for t in recent_tickets:
  235. reason_preview = (
  236. t.reason[:20] + "..."
  237. if t.reason and len(t.reason) > 20
  238. else t.reason
  239. )
  240. activities.append({
  241. "id": f"ticket_{t.id}",
  242. "text": f"新工单 #{t.id}: {reason_preview}",
  243. "time": t.created_at,
  244. "type": "ticket"
  245. })
  246. # 按时间倒序排序
  247. activities.sort(key=lambda x: x["time"], reverse=True)
  248. activities = activities[:10]
  249. # 格式化时间显示
  250. now_dt = datetime.now()
  251. for act in activities:
  252. dt = act["time"]
  253. diff = now_dt - dt
  254. if diff.days > 0:
  255. act["time"] = f"{diff.days}天前"
  256. elif diff.seconds > 3600:
  257. act["time"] = f"{diff.seconds // 3600}小时前"
  258. elif diff.seconds > 60:
  259. act["time"] = f"{diff.seconds // 60}分钟前"
  260. else:
  261. act["time"] = "刚刚"
  262. # --------------------------------------------------
  263. # 7. 组装返回结果
  264. # --------------------------------------------------
  265. return {
  266. "stats": {
  267. "totalOrders": orders_this_month, # 仅显示本月,为了匹配趋势语境
  268. "totalOrdersTrend": orders_trend, # 新增:订单增长率
  269. "totalRevenue": revenue_this_month, # 仅显示本月营收 (分)
  270. "totalRevenueTrend": revenue_trend, # 新增:营收增长率
  271. "activeUsers": active_users,
  272. "pendingTickets": pending_tickets,
  273. "successRate": success_rate_str
  274. },
  275. "revenue_trend": revenue_trend_chart, # 图表数据
  276. "product_dist": product_dist, # 饼图数据
  277. "recent_activities": activities # 动态列表
  278. }