payment_service.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. # app/services/payment_service.py
  2. import time
  3. import stripe
  4. import random
  5. import uuid
  6. import json
  7. from typing import Dict, List, Optional, Any
  8. from redis.asyncio import Redis
  9. from decimal import Decimal, ROUND_HALF_UP
  10. from datetime import datetime, timedelta
  11. from sqlalchemy.ext.asyncio import AsyncSession
  12. from sqlalchemy import select
  13. from app.utils.search import apply_keyword_search_stmt
  14. from app.utils.pagination import paginate
  15. from app.core.biz_exception import NotFoundError, BizLogicError
  16. from app.models.user import VasUser
  17. from app.models.order import VasOrder
  18. from app.models.payment import VasPayment
  19. from app.models.ticket import VasTicket
  20. from app.models.payment_event import VasPaymentEvent
  21. from app.models.product_routing import VasProductRouting
  22. from app.models.verification_token import VasVerificationToken
  23. from app.models.payment_provider import VasPaymentProvider
  24. from app.models.payment_qr import VasPaymentQR
  25. from app.models.payment_confirmation import VasPaymentConfirmation
  26. from app.models.configuration import Configuration
  27. from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload
  28. from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate
  29. from app.services.notification_service import NotificationService
  30. from app.services.webhook_service import WebhookService
  31. def convert_currency(
  32. amount: int,
  33. from_currency: str,
  34. to_currency: str,
  35. exchange_config: Dict[str, Any],
  36. ) -> Decimal:
  37. """
  38. 根据配置进行货币转换。
  39. amount: 最小单位(如分/cents)
  40. exchange_config: { "base": "CNY", "rates": { "CNY": 1.0, "USD": 7.25, ... } }
  41. 计算公式: Target = Source * (Rate_From / Rate_To)
  42. """
  43. if from_currency == to_currency:
  44. return Decimal(amount)
  45. rates = exchange_config.get("rates", {})
  46. # 校验货币是否支持
  47. if from_currency not in rates:
  48. raise BizLogicError(f"Exchange rate for {from_currency} not found in configuration")
  49. if to_currency not in rates:
  50. raise BizLogicError(f"Exchange rate for {to_currency} not found in configuration")
  51. # 获取相对于 Base 的汇率
  52. from_rate = Decimal(str(rates[from_currency]))
  53. to_rate = Decimal(str(rates[to_currency]))
  54. if to_rate == 0:
  55. raise BizLogicError(f"Invalid exchange rate for {to_currency} (0)")
  56. # 交叉汇率计算
  57. converted = (Decimal(amount) * from_rate) / to_rate
  58. # 四舍五入保留整数(支付系统通常使用最小单位)
  59. return converted.quantize(Decimal("1"), rounding=ROUND_HALF_UP)
  60. class PaymentService:
  61. @staticmethod
  62. async def _get_exchange_config(db: AsyncSession) -> Dict:
  63. """
  64. 内部辅助方法:从 Configuration 表读取汇率配置
  65. """
  66. stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
  67. config_obj = (await db.execute(stmt)).scalar_one_or_none()
  68. if not config_obj:
  69. raise BizLogicError("System configuration 'EXCHANGE_RATES' is missing")
  70. # 处理 config_value,可能是字符串也可能是已转换的字典(取决于ORM配置)
  71. val = config_obj.config_value
  72. if isinstance(val, str):
  73. try:
  74. return json.loads(val)
  75. except json.JSONDecodeError:
  76. raise BizLogicError("Invalid JSON format in 'EXCHANGE_RATES'")
  77. elif isinstance(val, dict):
  78. return val
  79. else:
  80. raise BizLogicError("Invalid type for 'EXCHANGE_RATES' configuration")
  81. @staticmethod
  82. async def create_payment(
  83. db: AsyncSession,
  84. payload: VasPaymentCreate,
  85. redis_client: Redis
  86. ) -> VasPayment:
  87. """
  88. 创建支付单的主入口
  89. """
  90. # ① 锁住订单(防并发)
  91. stmt = (
  92. select(VasOrder)
  93. .where(VasOrder.id == payload.order_id)
  94. .with_for_update()
  95. )
  96. order = (await db.execute(stmt)).scalar_one_or_none()
  97. if not order:
  98. raise NotFoundError("Order not found")
  99. # ② 是否已有 pending payment(幂等)
  100. stmt = select(VasPayment).where(
  101. VasPayment.order_id == order.id,
  102. VasPayment.status == "pending",
  103. )
  104. active_payment = (await db.execute(stmt)).scalar_one_or_none()
  105. if active_payment:
  106. if active_payment.provider == payload.provider:
  107. return active_payment
  108. else:
  109. active_payment.status = "expired"
  110. # ③ 获取最新的汇率配置
  111. exchange_config = await PaymentService._get_exchange_config(db)
  112. # ④ 根据 provider 创建
  113. if payload.provider in ("wechat", "alipay"):
  114. payment = await PaymentService.create_offline_payment(
  115. db=db,
  116. order=order,
  117. provider_name=payload.provider,
  118. exchange_config=exchange_config,
  119. )
  120. await db.commit()
  121. return payment
  122. if payload.provider == "stripe":
  123. payment = await PaymentService.create_stripe_payment(
  124. db=db,
  125. order=order,
  126. exchange_config=exchange_config,
  127. )
  128. await db.commit()
  129. return payment
  130. raise BizLogicError("Unsupported provider")
  131. @staticmethod
  132. async def confirm_by_user(
  133. db: AsyncSession,
  134. payload: VasPaymentConfirmationCreate,
  135. current_user: VasUser
  136. ):
  137. """
  138. 用户点击“我已支付”
  139. """
  140. # 1️⃣ 查询是否存在对应 payment 确认记录
  141. result = await db.execute(
  142. select(VasPaymentConfirmation)
  143. .where(VasPaymentConfirmation.payment_id == payload.payment_id)
  144. .where(VasPaymentConfirmation.user_id == current_user.id)
  145. )
  146. record = result.scalar_one_or_none()
  147. if not record:
  148. # 没有则创建一条 pending -> confirmed 记录
  149. record = VasPaymentConfirmation(
  150. payment_id=payload.payment_id,
  151. amount=payload.amount,
  152. currency=payload.currency,
  153. random_offset=payload.random_offset,
  154. user_id=current_user.id,
  155. status="pending",
  156. confirmed_at=payload.confirmed_at
  157. )
  158. db.add(record)
  159. await db.commit()
  160. await db.refresh(record)
  161. stmt = select(VasPayment).where(
  162. VasPayment.id == payload.payment_id
  163. )
  164. payment = (await db.execute(stmt)).scalar_one_or_none()
  165. formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
  166. # 2️⃣ 推送异步通知给管理员
  167. await NotificationService.post_message(
  168. db=db,
  169. channel="wechat",
  170. payload={
  171. "template_id": "payment_user_confirmed",
  172. "payload": {
  173. "order_id": payment.order_id,
  174. "payment_id": payload.payment_id,
  175. "user_email": current_user.email,
  176. "amount": payload.amount,
  177. "currency": payload.currency,
  178. "token": "",
  179. "confirmed_at": formatted_time,
  180. "provider": payment.provider
  181. }
  182. },
  183. )
  184. return record
  185. @staticmethod
  186. async def confirm_by_admin(
  187. db: AsyncSession,
  188. id: int,
  189. payload: VasPaymentConfirmationUpdate,
  190. current_user: VasUser
  191. ):
  192. """
  193. 管理员确认用户的支付
  194. """
  195. # 1️⃣ 查询对应确认记录
  196. result = await db.execute(
  197. select(VasPaymentConfirmation)
  198. .where(VasPaymentConfirmation.id == id)
  199. )
  200. record = result.scalar_one_or_none()
  201. if not record:
  202. raise NotFoundError("Payment confirmation record not found")
  203. # 3️⃣ 更新管理员确认状态
  204. record.admin_id = current_user.id
  205. record.admin_confirmed_at = datetime.utcnow()
  206. record.status = 'confirmed'
  207. await PaymentService._confirm_payment_action(db, record.payment_id, 'confirmed by admin')
  208. await db.commit()
  209. await db.refresh(record)
  210. return record
  211. @staticmethod
  212. async def admin_update_status(
  213. db: AsyncSession,
  214. payment_id: int,
  215. payload: AdminUpdateStatusPayload
  216. ):
  217. """
  218. 管理员强制更新支付状态
  219. """
  220. if payload.status == "succeeded":
  221. payment = await PaymentService._confirm_payment_action(db, payment_id, payload.remark)
  222. else:
  223. pay_stmt = (
  224. select(VasPayment)
  225. .where(VasPayment.id == payment_id)
  226. .order_by(VasPayment.created_at.desc())
  227. )
  228. pay_result = await db.execute(pay_stmt)
  229. payment = pay_result.scalar_one_or_none()
  230. if not payment:
  231. raise BizLogicError("Payment not found")
  232. payment.status = 'failed'
  233. await db.commit()
  234. await db.refresh(payment)
  235. await db.refresh(payment)
  236. return payment
  237. @staticmethod
  238. async def list_payment_confirmation(
  239. db: AsyncSession,
  240. keyword: Optional[str] = None,
  241. page: int = 1,
  242. size: int = 20,
  243. ):
  244. stmt = select(VasPaymentConfirmation)
  245. stmt = apply_keyword_search_stmt(
  246. stmt=stmt,
  247. model=VasPaymentConfirmation,
  248. keyword=keyword,
  249. fields=["user_id"],
  250. )
  251. stmt = stmt.order_by(VasPaymentConfirmation.id.desc())
  252. return await paginate(db, stmt, page, size)
  253. @staticmethod
  254. async def confirm_payment(
  255. db: AsyncSession,
  256. payment_id: int,
  257. token: str
  258. ):
  259. # 校验验证码
  260. stmt = select(VasVerificationToken).where(
  261. VasVerificationToken.token == token,
  262. VasVerificationToken.used == 0,
  263. )
  264. token_obj = (await db.execute(stmt)).scalar_one_or_none()
  265. if not token_obj:
  266. raise BizLogicError("Token invalid")
  267. if token_obj.expire_at < datetime.utcnow():
  268. raise BizLogicError("Token expired")
  269. payment = await PaymentService._confirm_payment_action(db, payment_id, 'confirmed by admin')
  270. token_obj.used = 1
  271. await db.commit()
  272. return payment
  273. @staticmethod
  274. async def _confirm_payment_action(db: AsyncSession, payment_id: int, remark:str):
  275. # ---------- 查找 payment ----------
  276. pay_stmt = (
  277. select(VasPayment)
  278. .where(VasPayment.id == payment_id)
  279. .order_by(VasPayment.created_at.desc())
  280. )
  281. pay_result = await db.execute(pay_stmt)
  282. payment = pay_result.scalar_one_or_none()
  283. if not payment:
  284. raise BizLogicError("Payment not found")
  285. event = VasPaymentEvent(
  286. provider=payment.provider,
  287. event_type="payment_received",
  288. title='confirm payment',
  289. content='confirm payment by admin',
  290. parsed_amount=payment.amount,
  291. parsed_currency=payment.currency,
  292. parsed_device='',
  293. status="received",
  294. )
  295. db.add(event)
  296. await db.commit()
  297. await db.refresh(event)
  298. if payment.status in ("succeeded", "late_paid"):
  299. event.status = "duplicate"
  300. event.matched_payment_id = payment.id
  301. event.matched_order_id = payment.order_id
  302. await db.commit()
  303. raise BizLogicError("Payment has been confirmed")
  304. now = datetime.utcnow()
  305. payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
  306. payment.provider_payload = {
  307. "title": remark,
  308. "received_at": now.isoformat(),
  309. }
  310. order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id)
  311. order_result = await db.execute(order_stmt)
  312. order = order_result.scalar_one_or_none()
  313. if order and order.status != "paid":
  314. order.status = "paid"
  315. await WebhookService._create_task_if_not_exists(db, order)
  316. event.status = "applied"
  317. event.matched_payment_id = payment.id
  318. event.matched_order_id = payment.order_id
  319. await db.commit()
  320. await db.refresh(payment)
  321. return payment
  322. @staticmethod
  323. async def create_offline_payment(
  324. db: AsyncSession,
  325. order: VasOrder,
  326. provider_name: str,
  327. exchange_config: Dict,
  328. ) -> VasPayment:
  329. payment = (
  330. await PaymentService._create_wechat_payment(db, order)
  331. if provider_name == "wechat"
  332. else await PaymentService._create_alipay_payment(db, order)
  333. )
  334. stmt = select(VasPaymentProvider).where(
  335. VasPaymentProvider.enabled == 1,
  336. VasPaymentProvider.name == provider_name,
  337. )
  338. provider = (await db.execute(stmt)).scalar_one_or_none()
  339. if not provider:
  340. raise BizLogicError("Payment provider not available")
  341. stmt = select(VasPaymentQR).where(
  342. VasPaymentQR.provider == provider_name,
  343. VasPaymentQR.is_active == 1,
  344. )
  345. qrs = (await db.execute(stmt)).scalars().all()
  346. if not qrs:
  347. raise BizLogicError("No payment QR available")
  348. # 选择QR码也需要稳定吗?如果是,也可以用 rng.choice(qrs)
  349. # 这里暂时只改 random_offset
  350. qr = random.choice(qrs)
  351. payment.qr_id = qr.id
  352. # --- 汇率计算 ---
  353. amount_converted = convert_currency(
  354. amount=payment.final_amount,
  355. from_currency=order.base_currency,
  356. to_currency=provider.currency,
  357. exchange_config=exchange_config,
  358. )
  359. rates = exchange_config.get("rates", {})
  360. rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
  361. rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
  362. if rate_to == 0:
  363. raise BizLogicError("Invalid target currency rate (0)")
  364. current_exchange_rate = rate_from / rate_to
  365. # --- 稳定的随机立减逻辑 ---
  366. # 规则:最大减免为金额的 1% 或 99分(取小值)
  367. max_discount = min(99, int(amount_converted * Decimal("0.01")))
  368. if max_discount >= 1:
  369. # 关键修改:使用 order.id 作为种子
  370. # 这确保了同一个订单号,无论计算多少次,得到的 discount 是一样的
  371. rng = random.Random(order.id)
  372. discount = rng.randint(1, max_discount)
  373. else:
  374. discount = 0
  375. payment.exchange_rate = current_exchange_rate
  376. payment.amount = int(amount_converted) - discount
  377. payment.currency = provider.currency
  378. payment.random_offset = discount
  379. return payment
  380. @staticmethod
  381. async def create_stripe_payment(
  382. db: AsyncSession,
  383. order: VasOrder,
  384. exchange_config: Dict,
  385. ) -> VasPayment:
  386. payment = await PaymentService._create_stripe_payment(db, order)
  387. stmt = select(VasPaymentProvider).where(
  388. VasPaymentProvider.enabled == 1,
  389. VasPaymentProvider.name == "stripe",
  390. )
  391. provider = (await db.execute(stmt)).scalar_one_or_none()
  392. if not provider:
  393. raise BizLogicError("Stripe provider not enabled")
  394. amount_converted = convert_currency(
  395. amount=payment.final_amount,
  396. from_currency=order.base_currency,
  397. to_currency=provider.currency,
  398. exchange_config=exchange_config,
  399. )
  400. rates = exchange_config.get("rates", {})
  401. rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
  402. rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
  403. if rate_to == 0:
  404. raise BizLogicError("Invalid target currency rate (0)")
  405. current_exchange_rate = rate_from / rate_to
  406. payment.exchange_rate = current_exchange_rate
  407. payment.amount = int(amount_converted)
  408. payment.currency = provider.currency
  409. payment.random_offset = 0 # Stripe 不需要随机减免
  410. stripe_session = PaymentService.create_checkout_session(
  411. order=order,
  412. payment=payment,
  413. success_url="https://text.skin/dashboard",
  414. cancel_url="https://text.skin/dashboard",
  415. )
  416. payment.payment_intent_id = stripe_session.id
  417. payment.payment_url = stripe_session.url
  418. return payment
  419. @staticmethod
  420. def create_checkout_session(
  421. order: VasOrder,
  422. payment: VasPayment,
  423. success_url: str,
  424. cancel_url: str,
  425. ):
  426. expires_at = int(time.time()) + 30 * 60
  427. return stripe.checkout.Session.create(
  428. mode="payment",
  429. payment_method_types=["card"],
  430. line_items=[
  431. {
  432. "price_data": {
  433. "currency": payment.currency.lower(),
  434. "product_data": {
  435. "name": f"Visa Service Order {order.id}",
  436. },
  437. "unit_amount": payment.amount,
  438. },
  439. "quantity": 1,
  440. }
  441. ],
  442. metadata={
  443. "order_id": order.id,
  444. "payment_id": payment.id,
  445. "user_id": order.user_id,
  446. },
  447. success_url=success_url,
  448. cancel_url=cancel_url,
  449. expires_at=expires_at,
  450. )
  451. @staticmethod
  452. async def _create_wechat_payment(
  453. db: AsyncSession,
  454. order: VasOrder,
  455. ) -> VasPayment:
  456. payment = VasPayment(
  457. order_id=order.id,
  458. provider="wechat",
  459. channel="qr_static",
  460. base_amount=order.base_amount,
  461. adjustment_delta=order.adjustment_delta,
  462. final_amount=order.final_amount,
  463. base_currency=order.base_currency,
  464. amount=0,
  465. currency="CNY",
  466. random_offset=0,
  467. exchange_rate=0,
  468. status="pending",
  469. expire_at=datetime.utcnow() + timedelta(minutes=30),
  470. )
  471. db.add(payment)
  472. await db.flush()
  473. return payment
  474. @staticmethod
  475. async def _create_alipay_payment(
  476. db: AsyncSession,
  477. order: VasOrder,
  478. ) -> VasPayment:
  479. payment = VasPayment(
  480. order_id=order.id,
  481. provider="alipay",
  482. channel="qr_static",
  483. base_amount=order.base_amount,
  484. adjustment_delta=order.adjustment_delta,
  485. final_amount=order.final_amount,
  486. base_currency=order.base_currency,
  487. amount=0,
  488. currency="CNY",
  489. random_offset=0,
  490. exchange_rate=0,
  491. status="pending",
  492. expire_at=datetime.utcnow() + timedelta(minutes=30),
  493. )
  494. db.add(payment)
  495. await db.flush()
  496. return payment
  497. @staticmethod
  498. async def _create_stripe_payment(
  499. db: AsyncSession,
  500. order: VasOrder,
  501. ) -> VasPayment:
  502. payment = VasPayment(
  503. order_id=order.id,
  504. provider="stripe",
  505. channel="online_link",
  506. base_amount=order.base_amount,
  507. adjustment_delta=order.adjustment_delta,
  508. final_amount=order.final_amount,
  509. base_currency=order.base_currency,
  510. amount=0,
  511. currency="EUR",
  512. random_offset=0,
  513. exchange_rate=0,
  514. status="pending",
  515. expire_at=datetime.utcnow() + timedelta(minutes=30),
  516. )
  517. db.add(payment)
  518. await db.flush()
  519. return payment
  520. @staticmethod
  521. async def list_by_order(
  522. db: AsyncSession,
  523. order_id: int,
  524. ) -> List[VasPayment]:
  525. stmt = select(VasPayment).where(
  526. VasPayment.order_id == order_id
  527. )
  528. result = await db.execute(stmt)
  529. return result.scalars().all()
  530. @staticmethod
  531. async def get_by_id(
  532. db: AsyncSession,
  533. id: int,
  534. ) -> VasPayment:
  535. stmt = select(VasPayment).where(VasPayment.id == id)
  536. obj = (await db.execute(stmt)).scalar_one_or_none()
  537. if not obj:
  538. raise NotFoundError(message="Payment not found")
  539. return obj