webhook_service.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import json
  2. from sqlalchemy.orm import Session
  3. from typing import List, Optional
  4. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  5. from app.models.order import VasOrder
  6. from app.models.vas_task import VasTask
  7. from app.models.product import VasProduct
  8. from app.models.product_routing import VasProductRouting
  9. from app.models.payment_event import VasPaymentEvent
  10. from app.models.payment import VasPayment
  11. from app.models.payment_qr import VasPaymentQR
  12. from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
  13. class WebhookService:
  14. def create_task_if_not_exists(
  15. db: Session,
  16. order: VasOrder,
  17. routing_key: str,
  18. script_version: str,
  19. config: dict,
  20. ):
  21. existing = (
  22. db.query(VasTask)
  23. .filter(
  24. VasTask.order_id == order.id,
  25. VasTask.routing_key == routing_key,
  26. VasTask.script_version == script_version,
  27. )
  28. .first()
  29. )
  30. if existing:
  31. return existing # 幂等命中,直接返回
  32. task = VasTask(
  33. order_id=order.id,
  34. routing_key=routing_key,
  35. script_version=script_version,
  36. config=config,
  37. user_inputs=order.user_inputs,
  38. status="pending",
  39. priority=10,
  40. expire_at=None,
  41. )
  42. db.add(task)
  43. return task
  44. @staticmethod
  45. def smshelper_payment_webhook(db: Session, payload: SMSHelperWebhookPayload):
  46. """
  47. webhook payload 示例:
  48. title=【微信】微信支付
  49. content=【SM-E5260】个人收款码到账¥0.01
  50. """
  51. title = payload.title
  52. content = payload.content
  53. webhook_out = PaymentWebhookOut()
  54. if "微信" in title:
  55. provider = "wechat"
  56. elif "支付宝" in title:
  57. provider = "alipay"
  58. device_match = re.search(r"【(.+?)】", content)
  59. device_id = device_match.group(1) if device_match else None
  60. amount_match = re.search(r"¥([\d.]+)", content)
  61. if not amount_match:
  62. raise BizLogicError("Amount not found in webhook content")
  63. amount_yuan = Decimal(amount_match.group(1))
  64. amount_cent = int(amount_yuan * 100)
  65. event = VasPaymentEvent(
  66. provider=provider,
  67. event_type="payment_received",
  68. title=title,
  69. content=content,
  70. parsed_amount=amount_cent,
  71. parsed_currency="CNY",
  72. parsed_device=device_id,
  73. raw_payload=payload.dict(),
  74. status="received"
  75. )
  76. db.add(event)
  77. db.commit()
  78. db.refresh(event)
  79. payment_qr = (
  80. db.query(VasPaymentQR)
  81. .filter(
  82. VasPaymentQR.provider == provider,
  83. VasPaymentQR.device == device_id,
  84. VasPaymentQR.is_active == 1
  85. )
  86. .first()
  87. )
  88. if not payment_qr:
  89. event.status = "failed"
  90. event.error_message = "QR not found"
  91. db.commit()
  92. raise BizLogicError("QR not found")
  93. payment = (
  94. db.query(VasPayment)
  95. .filter(
  96. VasPayment.provider == provider,
  97. VasPayment.amount == amount_cent,
  98. VasPayment.qr_id == payment_qr.id,
  99. VasPayment.status == "pending"
  100. )
  101. .order_by(VasPayment.created_at.desc())
  102. .first()
  103. )
  104. if not payment:
  105. event.status = "failed"
  106. event.error_message = "No matching pending payment"
  107. db.commit()
  108. raise BizLogicError("Payment not found")
  109. webhook_out.payment_id = payment.id
  110. if payment.status in ("succeeded", "late_paid"):
  111. event.status = "duplicate"
  112. event.matched_payment_id = payment.id
  113. event.matched_order_id = payment.order_id
  114. db.commit()
  115. webhook_out.status = True
  116. webhook_out.notify = False
  117. return webhook_out
  118. now = datetime.utcnow()
  119. if payment.expire_at and now > payment.expire_at:
  120. payment.status = "late_paid"
  121. else:
  122. payment.status = "succeeded"
  123. _create_task_if_not_exists(db, order)
  124. # ---------- 写入原始 payload ----------
  125. payment.provider_payload = {
  126. "title": title,
  127. "content": content,
  128. "device_id": device_id,
  129. "received_at": now.isoformat(),
  130. }
  131. order = db.query(VasOrder).filter(VasOrder.id == payment.order_id).first()
  132. if order and order.status != "paid":
  133. order.status = "paid"
  134. event.status = "applied"
  135. event.matched_payment_id = payment.id
  136. event.matched_order_id = payment.order_id
  137. db.commit()
  138. db.refresh(payment)
  139. webhook_out.status = True
  140. webhook_out.order_id = order.id
  141. webhook_out.user_id = order.user_id
  142. webhook_out.notify = True
  143. return webhook_out
  144. @staticmethod
  145. def stripe_payment_webhook(db: Session, event):
  146. """
  147. Stripe webhook handler
  148. """
  149. event_id = event["id"]
  150. event_type = event["type"]
  151. data = event["data"]["object"]
  152. webhook_out = PaymentWebhookOut()
  153. # ---------- 1. 幂等(事件级) ----------
  154. existed_event = (
  155. db.query(VasPaymentEvent)
  156. .filter(VasPaymentEvent.provider == "stripe")
  157. .filter(VasPaymentEvent.event_id == event_id)
  158. .first()
  159. )
  160. if existed_event:
  161. webhook_out.status = True
  162. webhook_out.notify = False
  163. return webhook_out
  164. # ---------- 2. 只处理关心的事件 ----------
  165. if event_type != "checkout.session.completed":
  166. db.add(
  167. VasPaymentEvent(
  168. provider="stripe",
  169. event_id=event_id,
  170. event_type=event_type,
  171. payload=event,
  172. created_at=datetime.utcnow(),
  173. )
  174. )
  175. db.commit()
  176. webhook_out.status = True
  177. webhook_out.notify = False
  178. return webhook_out
  179. # ---------- 3. 解析 metadata ----------
  180. metadata = data.get("metadata", {})
  181. payment_id = metadata.get("payment_id")
  182. order_id = metadata.get("order_id")
  183. if not payment_id or not order_id:
  184. raise BizLogicError("Missing payment_id or order_id in metadata")
  185. # ---------- 4. 查找 payment(业务级幂等) ----------
  186. payment = (
  187. db.query(VasPayment)
  188. .filter(VasPayment.id == int(payment_id))
  189. .first()
  190. )
  191. if not payment:
  192. raise NotFoundError("Payment not found")
  193. if payment.status == "succeeded":
  194. # 已处理过
  195. db.add(
  196. VasPaymentEvent(
  197. provider="stripe",
  198. event_id=event_id,
  199. event_type=event_type,
  200. payload=event,
  201. payment_id=payment.id,
  202. created_at=datetime.utcnow(),
  203. )
  204. )
  205. db.commit()
  206. webhook_out.status = True
  207. webhook_out.notify = False
  208. return webhook_out
  209. # ---------- 5. 金额校验 ----------
  210. paid_amount = data["amount_total"] # 单位:cent
  211. paid_currency = data["currency"].upper()
  212. if paid_amount != payment.amount or paid_currency != payment.currency:
  213. raise BizLogicError(f"Amount mismatch, expected {payment.amount} {payment.currency}, got {paid_amount} {paid_currency}")
  214. # ---------- 6. 判断是否超时 ----------
  215. now = datetime.utcnow()
  216. if payment.expire_at and now > payment.expire_at:
  217. payment.status = "late_paid"
  218. else:
  219. payment.status = "succeeded"
  220. _create_task_if_not_exists(db, order)
  221. payment.provider_payload = event
  222. payment.updated_at = now
  223. # ---------- 7. 更新 order ----------
  224. order = db.query(VasOrder).filter(VasOrder.id == order_id).first()
  225. if order and order.status != "paid":
  226. order.status = "paid"
  227. order.updated_at = now
  228. # ---------- 8. 写 payment_event ----------
  229. db.add(
  230. VasPaymentEvent(
  231. provider="stripe",
  232. event_id=event_id,
  233. event_type=event_type,
  234. payment_id=payment.id,
  235. order_id=order_id,
  236. payload=event,
  237. created_at=now,
  238. )
  239. )
  240. db.commit()
  241. webhook_out.status = True
  242. webhook_out.order_id = order.id
  243. webhook_out.user_id = order.user_id
  244. webhook_out.notify = True
  245. return webhook_out