webhook_service.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. import re
  2. import json
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.orm import Session
  5. from typing import List, Optional
  6. from decimal import Decimal, ROUND_HALF_UP
  7. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  8. from app.models.order import VasOrder
  9. from app.models.vas_task import VasTask
  10. from app.models.product import VasProduct
  11. from app.models.product_routing import VasProductRouting
  12. from app.models.payment_event import VasPaymentEvent
  13. from app.models.payment import VasPayment
  14. from app.models.payment_qr import VasPaymentQR
  15. from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
  16. class WebhookService:
  17. @staticmethod
  18. def _create_task_if_not_exists(
  19. db: Session,
  20. order: VasOrder
  21. ):
  22. routings = (
  23. db.query(VasProductRouting)
  24. .filter(
  25. VasProductRouting.product_id == order.product_id,
  26. VasProductRouting.is_active == 1
  27. )
  28. .all()
  29. )
  30. if not routings:
  31. return []
  32. created_tasks = []
  33. for routing in routings:
  34. # ---------- 2. 幂等判断 ----------
  35. exists = (
  36. db.query(VasTask)
  37. .filter(
  38. VasTask.order_id == order.id,
  39. VasTask.routing_key == routing.routing_key,
  40. VasTask.script_version == routing.script_version,
  41. )
  42. .first()
  43. )
  44. if exists:
  45. continue
  46. # ---------- 3. 创建 task ----------
  47. task = VasTask(
  48. order_id=order.id,
  49. routing_key=routing.routing_key,
  50. script_version=routing.script_version,
  51. priority=10,
  52. status="pending",
  53. user_inputs=order.user_inputs,
  54. config=routing.config,
  55. attempt_count=0,
  56. notify_count=0,
  57. expire_at=datetime.utcnow() + timedelta(days=60),
  58. created_at=datetime.utcnow(),
  59. )
  60. db.add(task)
  61. created_tasks.append(task)
  62. return created_tasks
  63. @staticmethod
  64. def smshelper_payment_webhook(db: Session, payload: SMSHelperWebhookPayload):
  65. """
  66. webhook payload 示例:
  67. title=【微信】微信支付
  68. content=【SM-E5260】个人收款码到账¥0.01
  69. """
  70. title = payload.title
  71. content = payload.content
  72. if "微信" in title:
  73. provider = "wechat"
  74. elif "支付宝" in title:
  75. provider = "alipay"
  76. device_match = re.search(r"【(.+?)】", content)
  77. device_id = device_match.group(1) if device_match else None
  78. amount_match = re.search(r"¥([\d.]+)", content)
  79. if not amount_match:
  80. raise BizLogicError("Amount not found in webhook content")
  81. amount_yuan = Decimal(amount_match.group(1))
  82. amount_cent = int(amount_yuan * 100)
  83. event = VasPaymentEvent(
  84. provider=provider,
  85. event_type="payment_received",
  86. title=title,
  87. content=content,
  88. parsed_amount=amount_cent,
  89. parsed_currency="CNY",
  90. parsed_device=device_id,
  91. raw_payload=payload.dict(),
  92. status="received"
  93. )
  94. db.add(event)
  95. db.commit()
  96. db.refresh(event)
  97. payment_qr = (
  98. db.query(VasPaymentQR)
  99. .filter(
  100. VasPaymentQR.provider == provider,
  101. VasPaymentQR.device == device_id,
  102. VasPaymentQR.is_active == 1
  103. )
  104. .first()
  105. )
  106. if not payment_qr:
  107. event.status = "failed"
  108. event.error_message = "QR not found"
  109. db.commit()
  110. raise BizLogicError("QR not found")
  111. payment = (
  112. db.query(VasPayment)
  113. .filter(
  114. VasPayment.provider == provider,
  115. VasPayment.amount == amount_cent,
  116. VasPayment.qr_id == payment_qr.id,
  117. VasPayment.status == "pending"
  118. )
  119. .order_by(VasPayment.created_at.desc())
  120. .first()
  121. )
  122. if not payment:
  123. event.status = "failed"
  124. event.error_message = "No matching pending payment"
  125. db.commit()
  126. raise BizLogicError("Payment not found")
  127. if payment.status in ("succeeded", "late_paid"):
  128. event.status = "duplicate"
  129. event.matched_payment_id = payment.id
  130. event.matched_order_id = payment.order_id
  131. db.commit()
  132. return None
  133. now = datetime.utcnow()
  134. if payment.expire_at and now > payment.expire_at:
  135. payment.status = "late_paid"
  136. else:
  137. payment.status = "succeeded"
  138. # ---------- 写入原始 payload ----------
  139. payment.provider_payload = {
  140. "title": title,
  141. "content": content,
  142. "device_id": device_id,
  143. "received_at": now.isoformat(),
  144. }
  145. order = db.query(VasOrder).filter(VasOrder.id == payment.order_id).first()
  146. if order and order.status != "paid":
  147. order.status = "paid"
  148. WebhookService._create_task_if_not_exists(db, order)
  149. event.status = "applied"
  150. event.matched_payment_id = payment.id
  151. event.matched_order_id = payment.order_id
  152. db.commit()
  153. db.refresh(payment)
  154. return PaymentWebhookOut(
  155. status=True,
  156. order_id=order.id,
  157. user_id=order.user_id,
  158. payment_id=payment.id,
  159. notify=True
  160. )
  161. @staticmethod
  162. def stripe_payment_webhook(db: Session, event):
  163. """
  164. Stripe webhook handler
  165. """
  166. event_id = event["id"]
  167. event_type = event["type"]
  168. data = event["data"]["object"]
  169. # ---------- 1. 幂等(事件级) ----------
  170. existed_event = (
  171. db.query(VasPaymentEvent)
  172. .filter(VasPaymentEvent.provider == "stripe")
  173. .filter(VasPaymentEvent.event_id == event_id)
  174. .first()
  175. )
  176. if existed_event:
  177. return None
  178. # ---------- 2. 只处理关心的事件 ----------
  179. if event_type != "checkout.session.completed":
  180. db.add(
  181. VasPaymentEvent(
  182. provider="stripe",
  183. event_id=event_id,
  184. event_type=event_type,
  185. payload=event,
  186. created_at=datetime.utcnow(),
  187. )
  188. )
  189. db.commit()
  190. return None
  191. # ---------- 3. 解析 metadata ----------
  192. metadata = data.get("metadata", {})
  193. payment_id = metadata.get("payment_id")
  194. order_id = metadata.get("order_id")
  195. if not payment_id or not order_id:
  196. raise BizLogicError("Missing payment_id or order_id in metadata")
  197. # ---------- 4. 查找 payment(业务级幂等) ----------
  198. payment = (
  199. db.query(VasPayment)
  200. .filter(VasPayment.id == int(payment_id))
  201. .first()
  202. )
  203. if not payment:
  204. raise NotFoundError("Payment not found")
  205. if payment.status == "succeeded":
  206. # 已处理过
  207. db.add(
  208. VasPaymentEvent(
  209. provider="stripe",
  210. event_id=event_id,
  211. event_type=event_type,
  212. payload=event,
  213. payment_id=payment.id,
  214. created_at=datetime.utcnow(),
  215. )
  216. )
  217. db.commit()
  218. return None
  219. # ---------- 5. 金额校验 ----------
  220. paid_amount = data["amount_total"] # 单位:cent
  221. paid_currency = data["currency"].upper()
  222. if paid_amount != payment.amount or paid_currency != payment.currency:
  223. raise BizLogicError(f"Amount mismatch, expected {payment.amount} {payment.currency}, got {paid_amount} {paid_currency}")
  224. # ---------- 6. 判断是否超时 ----------
  225. now = datetime.utcnow()
  226. if payment.expire_at and now > payment.expire_at:
  227. payment.status = "late_paid"
  228. else:
  229. payment.status = "succeeded"
  230. payment.provider_payload = event
  231. payment.updated_at = now
  232. # ---------- 7. 更新 order ----------
  233. order = db.query(VasOrder).filter(VasOrder.id == order_id).first()
  234. if order and order.status != "paid":
  235. order.status = "paid"
  236. order.updated_at = now
  237. WebhookService._create_task_if_not_exists(db, order)
  238. # ---------- 8. 写 payment_event ----------
  239. db.add(
  240. VasPaymentEvent(
  241. provider="stripe",
  242. event_id=event_id,
  243. event_type=event_type,
  244. payment_id=payment.id,
  245. order_id=order_id,
  246. payload=event,
  247. created_at=now,
  248. )
  249. )
  250. db.commit()
  251. db.refresh(payment)
  252. return PaymentWebhookOut(
  253. status=True,
  254. order_id=order.id,
  255. user_id=order.user_id,
  256. payment_id=payment.id,
  257. notify=True
  258. )