webhook_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. import re
  2. import json
  3. from datetime import datetime, timedelta
  4. from typing import List, Optional
  5. from decimal import Decimal
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from sqlalchemy import select
  8. from app.core.biz_exception import NotFoundError, BizLogicError
  9. from app.models.order import VasOrder
  10. from app.models.vas_task import VasTask
  11. from app.models.product_routing import VasProductRouting
  12. from app.models.payment_event import VasPaymentEvent
  13. from app.models.payment import VasPayment
  14. from app.models.payment_qr import VasPaymentQR
  15. from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
  16. class WebhookService:
  17. # =========================================================
  18. # 内部方法:创建 Task(幂等)
  19. # =========================================================
  20. @staticmethod
  21. async def _create_task_if_not_exists(
  22. db: AsyncSession,
  23. order: VasOrder,
  24. ) -> List[VasTask]:
  25. # 1. 查询路由配置
  26. stmt = select(VasProductRouting).where(
  27. VasProductRouting.product_id == order.product_id,
  28. VasProductRouting.is_active == 1,
  29. )
  30. result = await db.execute(stmt)
  31. routings = result.scalars().all()
  32. if not routings:
  33. return []
  34. created_tasks: List[VasTask] = []
  35. # --- 别名邮箱生成逻辑开始 ---
  36. # 提取原始邮箱用户名
  37. user_inputs = order.user_inputs or {}
  38. original_email = str(user_inputs.get("email", "user")).strip()
  39. base_user = original_email.split('@')[0] if '@' in original_email else original_email
  40. if not base_user:
  41. base_user = "user"
  42. # 计算稳定的域名索引 (基于原始用户名)
  43. DOMAINS = [
  44. "gmail-app.com",
  45. "outlooksearch.com",
  46. "hotmails.vip",
  47. "gmail365.cc",
  48. "ymails.top",
  49. "teamymail.cfd"
  50. ]
  51. h = 0
  52. for ch in base_user:
  53. h = (31 * h + ord(ch)) & 0xFFFFFFFF
  54. if h & 0x80000000: # 模拟 Java 有符号 32 位 int
  55. h = -((~h + 1) & 0xFFFFFFFF)
  56. domain_index = abs(h) % len(DOMAINS)
  57. selected_domain = DOMAINS[domain_index]
  58. # 提取 Order ID 后两位作为唯一后缀
  59. order_id_str = str(order.id)
  60. suffix = order_id_str[-2:] if len(order_id_str) >= 2 else order_id_str
  61. # 组合最终别名
  62. final_alias_email = f"{base_user}{suffix}@{selected_domain}"
  63. # --- 别名邮箱生成逻辑结束 ---
  64. for routing in routings:
  65. # 幂等检查
  66. exists_stmt = select(VasTask).where(
  67. VasTask.order_id == order.id,
  68. VasTask.routing_key == routing.routing_key,
  69. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  70. )
  71. exists_result = await db.execute(exists_stmt)
  72. exists = exists_result.scalar_one_or_none()
  73. if exists:
  74. continue
  75. # 构造任务配置,合并路由默认配置与动态生成的 alias_email
  76. # 如果 routing.config 已经有内容,我们覆盖其中的关键字段
  77. task_config = (routing.config or {}).copy()
  78. task_config.update({
  79. "alias_email": final_alias_email,
  80. "include_today": True,
  81. "include_tomorrow": True,
  82. "time_filter": ["AM", "PM"],
  83. "exclude_dates": [],
  84. "allowed_weekdays": [1, 2, 3, 4, 5],
  85. "select": "random"
  86. })
  87. task = VasTask(
  88. order_id=order.id,
  89. routing_key=routing.routing_key,
  90. script_version=routing.script_version,
  91. priority=routing.priority,
  92. status="pending",
  93. user_inputs=order.user_inputs,
  94. config=task_config, # 使用新生成的配置
  95. attempt_count=0,
  96. notify_count=0,
  97. expire_at=datetime.utcnow() + timedelta(days=60),
  98. created_at=datetime.utcnow(),
  99. )
  100. db.add(task)
  101. await db.flush()
  102. await db.refresh(task)
  103. created_tasks.append(task)
  104. return created_tasks
  105. # =========================================================
  106. # SMSHelper 微信 / 支付宝 收款 webhook
  107. # =========================================================
  108. @staticmethod
  109. async def smshelper_payment_webhook(
  110. db: AsyncSession,
  111. payload: SMSHelperWebhookPayload,
  112. ) -> Optional[PaymentWebhookOut]:
  113. title = payload.title
  114. content = payload.content
  115. if "微信" in title:
  116. provider = "wechat"
  117. elif "支付宝" in title:
  118. provider = "alipay"
  119. else:
  120. raise BizLogicError("Unknown payment provider")
  121. device_match = re.search(r"【(.+?)】", content)
  122. device_id = device_match.group(1) if device_match else None
  123. amount_match = re.search(r"¥([\d.]+)", content)
  124. if not amount_match:
  125. raise BizLogicError("Amount not found in webhook content")
  126. amount_yuan = Decimal(amount_match.group(1))
  127. amount_cent = int(amount_yuan * 100)
  128. # 修正点:实例化 Event 时字段名需匹配模型
  129. event = VasPaymentEvent(
  130. provider=provider,
  131. event_type="payment_received",
  132. title=title,
  133. content=content,
  134. parsed_amount=amount_cent,
  135. parsed_currency="CNY",
  136. parsed_device=device_id,
  137. raw_payload=payload.dict(), # 对应模型 raw_payload
  138. status="received",
  139. )
  140. db.add(event)
  141. await db.commit()
  142. await db.refresh(event)
  143. # ---------- 查找 QR ----------
  144. qr_stmt = select(VasPaymentQR).where(
  145. VasPaymentQR.provider == provider,
  146. VasPaymentQR.device == device_id,
  147. VasPaymentQR.is_active == 1,
  148. )
  149. qr_result = await db.execute(qr_stmt)
  150. payment_qr = qr_result.scalar_one_or_none()
  151. if not payment_qr:
  152. event.status = "failed"
  153. event.error_message = "QR not found"
  154. await db.commit()
  155. raise BizLogicError("QR not found")
  156. # ---------- 查找 payment ----------
  157. pay_stmt = (
  158. select(VasPayment)
  159. .where(
  160. VasPayment.provider == provider,
  161. VasPayment.amount == amount_cent,
  162. VasPayment.qr_id == payment_qr.id,
  163. VasPayment.status == "pending",
  164. )
  165. .order_by(VasPayment.created_at.desc())
  166. )
  167. pay_result = await db.execute(pay_stmt)
  168. payment = pay_result.scalar_one_or_none()
  169. if not payment:
  170. event.status = "failed"
  171. event.error_message = "No matching pending payment"
  172. await db.commit()
  173. raise BizLogicError("Payment not found")
  174. if payment.status in ("succeeded", "late_paid"):
  175. event.status = "duplicate"
  176. event.matched_payment_id = payment.id
  177. event.matched_order_id = payment.order_id
  178. await db.commit()
  179. return None
  180. now = datetime.utcnow()
  181. payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
  182. payment.provider_payload = {
  183. "title": title,
  184. "content": content,
  185. "device_id": device_id,
  186. "received_at": now.isoformat(),
  187. }
  188. order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id)
  189. order_result = await db.execute(order_stmt)
  190. order = order_result.scalar_one_or_none()
  191. if order and order.status != "paid":
  192. order.status = "paid"
  193. await WebhookService._create_task_if_not_exists(db, order)
  194. event.status = "applied"
  195. event.matched_payment_id = payment.id
  196. event.matched_order_id = payment.order_id
  197. await db.commit()
  198. await db.refresh(payment)
  199. return PaymentWebhookOut(
  200. status=True,
  201. order_id=order.id,
  202. user_id=order.user_id,
  203. payment_id=payment.id,
  204. notify=True,
  205. )
  206. # =========================================================
  207. # Stripe webhook
  208. # =========================================================
  209. @staticmethod
  210. async def stripe_payment_webhook(
  211. db: AsyncSession,
  212. event: dict,
  213. ) -> Optional[PaymentWebhookOut]:
  214. event_id = event["id"]
  215. event_type = event["type"]
  216. data = event["data"]["object"]
  217. existed_stmt = select(VasPaymentEvent).where(
  218. VasPaymentEvent.provider == "stripe",
  219. VasPaymentEvent.event_id == event_id,
  220. )
  221. existed_result = await db.execute(existed_stmt)
  222. if existed_result.scalar_one_or_none():
  223. return None
  224. if event_type != "checkout.session.completed":
  225. db.add(
  226. VasPaymentEvent(
  227. provider="stripe",
  228. event_id=event_id,
  229. event_type=event_type,
  230. raw_payload=event, # 修正:payload -> raw_payload
  231. created_at=datetime.utcnow(),
  232. status="ignored" # 补充状态,虽有默认值但明确更好
  233. )
  234. )
  235. await db.commit()
  236. return None
  237. metadata = data.get("metadata", {})
  238. payment_id = metadata.get("payment_id")
  239. order_id = metadata.get("order_id")
  240. if not payment_id or not order_id:
  241. raise BizLogicError("Missing payment_id or order_id in metadata")
  242. pay_stmt = select(VasPayment).where(VasPayment.id == int(payment_id))
  243. pay_result = await db.execute(pay_stmt)
  244. payment = pay_result.scalar_one_or_none()
  245. if not payment:
  246. raise NotFoundError("Payment not found")
  247. if payment.status == "succeeded":
  248. db.add(
  249. VasPaymentEvent(
  250. provider="stripe",
  251. event_id=event_id,
  252. event_type=event_type,
  253. matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id
  254. raw_payload=event, # 补充:记录 raw_payload
  255. status="duplicate", # 补充:记录状态
  256. created_at=datetime.utcnow(),
  257. )
  258. )
  259. await db.commit()
  260. return None
  261. paid_amount = data["amount_total"]
  262. paid_currency = data["currency"].upper()
  263. if paid_amount != payment.amount or paid_currency != payment.currency:
  264. raise BizLogicError(
  265. f"Amount mismatch, expected {payment.amount} {payment.currency}, "
  266. f"got {paid_amount} {paid_currency}"
  267. )
  268. now = datetime.utcnow()
  269. payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
  270. payment.provider_payload = event
  271. payment.updated_at = now
  272. order_stmt = select(VasOrder).where(VasOrder.id == order_id)
  273. order_result = await db.execute(order_stmt)
  274. order = order_result.scalar_one_or_none()
  275. if order and order.status != "paid":
  276. order.status = "paid"
  277. order.updated_at = now
  278. await WebhookService._create_task_if_not_exists(db, order)
  279. db.add(
  280. VasPaymentEvent(
  281. provider="stripe",
  282. event_id=event_id,
  283. event_type=event_type,
  284. matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id
  285. matched_order_id=order_id, # 修正:order_id -> matched_order_id
  286. raw_payload=event, # 修正:payload -> raw_payload
  287. status="applied", # 明确状态
  288. created_at=now,
  289. )
  290. )
  291. await db.commit()
  292. await db.refresh(payment)
  293. return PaymentWebhookOut(
  294. status=True,
  295. order_id=order.id,
  296. user_id=order.user_id,
  297. payment_id=payment.id,
  298. notify=True,
  299. )