webhook_service.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import re
  2. import json
  3. from datetime import datetime, timedelta
  4. from typing import List, Optional
  5. from decimal import Decimal
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from sqlalchemy import select
  8. from app.core.queue_manager import queue_manager
  9. from app.core.biz_exception import NotFoundError, BizLogicError
  10. from app.models.order import VasOrder
  11. from app.models.vas_task import VasTask
  12. from app.models.product_routing import VasProductRouting
  13. from app.models.payment_event import VasPaymentEvent
  14. from app.models.payment import VasPayment
  15. from app.models.payment_qr import VasPaymentQR
  16. from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
  17. class WebhookService:
  18. # =========================================================
  19. # 内部方法:创建 Task(幂等)
  20. # =========================================================
  21. @staticmethod
  22. async def _create_task_if_not_exists(
  23. db: AsyncSession,
  24. order: VasOrder,
  25. ) -> List[VasTask]:
  26. stmt = select(VasProductRouting).where(
  27. VasProductRouting.product_id == order.product_id,
  28. VasProductRouting.is_active == 1,
  29. )
  30. result = await db.execute(stmt)
  31. routings = result.scalars().all()
  32. print(f'routings = {routings}')
  33. if not routings:
  34. return []
  35. created_tasks: List[VasTask] = []
  36. for routing in routings:
  37. exists_stmt = select(VasTask).where(
  38. VasTask.order_id == order.id,
  39. VasTask.routing_key == routing.routing_key,
  40. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  41. )
  42. exists_result = await db.execute(exists_stmt)
  43. exists = exists_result.scalar_one_or_none()
  44. if exists:
  45. continue
  46. task = VasTask(
  47. order_id=order.id,
  48. routing_key=routing.routing_key,
  49. script_version=routing.script_version,
  50. priority=routing.priority,
  51. status="pending",
  52. user_inputs=order.user_inputs,
  53. config=routing.config,
  54. attempt_count=0,
  55. notify_count=0,
  56. expire_at=datetime.utcnow() + timedelta(days=60),
  57. created_at=datetime.utcnow(),
  58. )
  59. db.add(task)
  60. await db.flush()
  61. await db.refresh(task)
  62. queue_manager.put(
  63. queue_name=routing.routing_key,
  64. task_id=task.id,
  65. priority=task.priority
  66. )
  67. created_tasks.append(task)
  68. return created_tasks
  69. # =========================================================
  70. # SMSHelper 微信 / 支付宝 收款 webhook
  71. # =========================================================
  72. @staticmethod
  73. async def smshelper_payment_webhook(
  74. db: AsyncSession,
  75. payload: SMSHelperWebhookPayload,
  76. ) -> Optional[PaymentWebhookOut]:
  77. title = payload.title
  78. content = payload.content
  79. if "微信" in title:
  80. provider = "wechat"
  81. elif "支付宝" in title:
  82. provider = "alipay"
  83. else:
  84. raise BizLogicError("Unknown payment provider")
  85. device_match = re.search(r"【(.+?)】", content)
  86. device_id = device_match.group(1) if device_match else None
  87. amount_match = re.search(r"¥([\d.]+)", content)
  88. if not amount_match:
  89. raise BizLogicError("Amount not found in webhook content")
  90. amount_yuan = Decimal(amount_match.group(1))
  91. amount_cent = int(amount_yuan * 100)
  92. event = VasPaymentEvent(
  93. provider=provider,
  94. event_type="payment_received",
  95. title=title,
  96. content=content,
  97. parsed_amount=amount_cent,
  98. parsed_currency="CNY",
  99. parsed_device=device_id,
  100. raw_payload=payload.dict(),
  101. status="received",
  102. )
  103. db.add(event)
  104. await db.commit()
  105. await db.refresh(event)
  106. # ---------- 查找 QR ----------
  107. qr_stmt = select(VasPaymentQR).where(
  108. VasPaymentQR.provider == provider,
  109. VasPaymentQR.device == device_id,
  110. VasPaymentQR.is_active == 1,
  111. )
  112. qr_result = await db.execute(qr_stmt)
  113. payment_qr = qr_result.scalar_one_or_none()
  114. if not payment_qr:
  115. event.status = "failed"
  116. event.error_message = "QR not found"
  117. await db.commit()
  118. raise BizLogicError("QR not found")
  119. # ---------- 查找 payment ----------
  120. pay_stmt = (
  121. select(VasPayment)
  122. .where(
  123. VasPayment.provider == provider,
  124. VasPayment.amount == amount_cent,
  125. VasPayment.qr_id == payment_qr.id,
  126. VasPayment.status == "pending",
  127. )
  128. .order_by(VasPayment.created_at.desc())
  129. )
  130. pay_result = await db.execute(pay_stmt)
  131. payment = pay_result.scalar_one_or_none()
  132. if not payment:
  133. event.status = "failed"
  134. event.error_message = "No matching pending payment"
  135. await db.commit()
  136. raise BizLogicError("Payment not found")
  137. if payment.status in ("succeeded", "late_paid"):
  138. event.status = "duplicate"
  139. event.matched_payment_id = payment.id
  140. event.matched_order_id = payment.order_id
  141. await db.commit()
  142. return None
  143. now = datetime.utcnow()
  144. payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
  145. payment.provider_payload = {
  146. "title": title,
  147. "content": content,
  148. "device_id": device_id,
  149. "received_at": now.isoformat(),
  150. }
  151. order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id)
  152. order_result = await db.execute(order_stmt)
  153. order = order_result.scalar_one_or_none()
  154. if order and order.status != "paid":
  155. order.status = "paid"
  156. await WebhookService._create_task_if_not_exists(db, order)
  157. event.status = "applied"
  158. event.matched_payment_id = payment.id
  159. event.matched_order_id = payment.order_id
  160. await db.commit()
  161. await db.refresh(payment)
  162. return PaymentWebhookOut(
  163. status=True,
  164. order_id=order.id,
  165. user_id=order.user_id,
  166. payment_id=payment.id,
  167. notify=True,
  168. )
  169. # =========================================================
  170. # Stripe webhook
  171. # =========================================================
  172. @staticmethod
  173. async def stripe_payment_webhook(
  174. db: AsyncSession,
  175. event: dict,
  176. ) -> Optional[PaymentWebhookOut]:
  177. event_id = event["id"]
  178. event_type = event["type"]
  179. data = event["data"]["object"]
  180. existed_stmt = select(VasPaymentEvent).where(
  181. VasPaymentEvent.provider == "stripe",
  182. VasPaymentEvent.event_id == event_id,
  183. )
  184. existed_result = await db.execute(existed_stmt)
  185. if existed_result.scalar_one_or_none():
  186. return None
  187. if event_type != "checkout.session.completed":
  188. db.add(
  189. VasPaymentEvent(
  190. provider="stripe",
  191. event_id=event_id,
  192. event_type=event_type,
  193. payload=event,
  194. created_at=datetime.utcnow(),
  195. )
  196. )
  197. await db.commit()
  198. return None
  199. metadata = data.get("metadata", {})
  200. payment_id = metadata.get("payment_id")
  201. order_id = metadata.get("order_id")
  202. if not payment_id or not order_id:
  203. raise BizLogicError("Missing payment_id or order_id in metadata")
  204. pay_stmt = select(VasPayment).where(VasPayment.id == int(payment_id))
  205. pay_result = await db.execute(pay_stmt)
  206. payment = pay_result.scalar_one_or_none()
  207. if not payment:
  208. raise NotFoundError("Payment not found")
  209. if payment.status == "succeeded":
  210. db.add(
  211. VasPaymentEvent(
  212. provider="stripe",
  213. event_id=event_id,
  214. event_type=event_type,
  215. payment_id=payment.id,
  216. created_at=datetime.utcnow(),
  217. )
  218. )
  219. await db.commit()
  220. return None
  221. paid_amount = data["amount_total"]
  222. paid_currency = data["currency"].upper()
  223. if paid_amount != payment.amount or paid_currency != payment.currency:
  224. raise BizLogicError(
  225. f"Amount mismatch, expected {payment.amount} {payment.currency}, "
  226. f"got {paid_amount} {paid_currency}"
  227. )
  228. now = datetime.utcnow()
  229. payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
  230. payment.provider_payload = event
  231. payment.updated_at = now
  232. order_stmt = select(VasOrder).where(VasOrder.id == order_id)
  233. order_result = await db.execute(order_stmt)
  234. order = order_result.scalar_one_or_none()
  235. if order and order.status != "paid":
  236. order.status = "paid"
  237. order.updated_at = now
  238. await WebhookService._create_task_if_not_exists(db, order)
  239. db.add(
  240. VasPaymentEvent(
  241. provider="stripe",
  242. event_id=event_id,
  243. event_type=event_type,
  244. payment_id=payment.id,
  245. order_id=order_id,
  246. payload=event,
  247. created_at=now,
  248. )
  249. )
  250. await db.commit()
  251. await db.refresh(payment)
  252. return PaymentWebhookOut(
  253. status=True,
  254. order_id=order.id,
  255. user_id=order.user_id,
  256. payment_id=payment.id,
  257. notify=True,
  258. )