| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- # app/services/payment_service.py
- import time
- import stripe
- import random
- from typing import List,Dict
- from decimal import Decimal, ROUND_HALF_UP
- from datetime import datetime, timedelta
- from sqlalchemy.orm import Session
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.models.order import VasOrder
- from app.models.product import VasProduct
- from app.models.payment import VasPayment
- from app.models.payment_provider import VasPaymentProvider
- from app.models.payment_qr import VasPaymentQR
- from app.schemas.payment import VasPaymentCreate
- class PaymentService:
-
- @staticmethod
- def create_payment(db: Session, payload: VasPaymentCreate, rate_table: Dict):
- # ① 锁住订单,防止并发创建 payment
- order = (
- db.query(VasOrder)
- .filter(VasOrder.id == payload.order_id)
- .with_for_update()
- .one()
- )
- # ② 是否已有进行中的 payment
- active_payment = (
- db.query(VasPayment)
- .filter(
- VasPayment.order_id == order.id,
- VasPayment.status == "pending"
- )
- .first()
- )
- if active_payment:
- if active_payment.provider == payload.provider:
- return active_payment # 直接返回旧的,不报错(幂等性)
- else:
- active_payment.status = 'failed'
- db.add(active_payment)
- if payload.provider in ("wechat", "alipay"):
- payment = PaymentService.create_offline_payment(db=db, order=order, provider_name=payload.provider, rate_table=rate_table)
- db.commit()
- return payment
- if payload.provider == "stripe":
- payment = PaymentService.create_stripe_payment(db=db, order=order, rate_table=rate_table)
- db.commit()
- return payment
- raise BizLogicError("Unsupported provider")
-
- @staticmethod
- def create_offline_payment(db, order, provider_name: str, rate_table: dict):
- payment = (
- PaymentService._create_wechat_payment(db, order)
- if provider_name == "wechat"
- else PaymentService._create_alipay_payment(db, order)
- )
- provider = db.query(VasPaymentProvider).filter(
- VasPaymentProvider.enabled == 1,
- VasPaymentProvider.name == provider_name
- ).first()
- qrs = db.query(VasPaymentQR).filter(VasPaymentQR.provider == provider_name).all()
- if not qrs:
- raise BizLogicError("No payment QR available")
- qr = random.choice(qrs)
- payment.qr_id = qr.id
- rate_key = f"{order.base_currency}->{provider.currency}".upper()
- exchange_rate = Decimal(rate_table[rate_key])
- converted = (
- Decimal(payment.base_amount) * exchange_rate
- ).quantize(Decimal("1"), rounding=ROUND_HALF_UP)
-
- max_discount = min(99, int(converted * Decimal("0.01")))
- discount = random.randint(1, max_discount) if max_discount >= 1 else 0
- final_amount = int(converted) - discount
- payment.exchange_rate = exchange_rate
- payment.amount = final_amount
- payment.currency = provider.currency
- payment.random_offset = discount
- return payment
-
- @staticmethod
- def create_stripe_payment(db, order, rate_table: dict):
- payment = PaymentService._create_stripe_payment(db, order)
- provider = db.query(VasPaymentProvider).filter(
- VasPaymentProvider.enabled == 1,
- VasPaymentProvider.name == 'stripe'
- ).first()
- rate_key = f"{order.base_currency}->{provider.currency}".upper()
- exchange_rate = Decimal(rate_table[rate_key])
- converted = (
- Decimal(payment.base_amount) * exchange_rate
- ).quantize(Decimal("1"), rounding=ROUND_HALF_UP)
- payment.exchange_rate = exchange_rate
- payment.amount = int(converted)
- payment.currency = provider.currency
- payment.random_offset = 0
- stripe_session = PaymentService.create_checkout_session(
- order=order,
- payment=payment,
- success_url="https://yourdomain.com/pay/success",
- cancel_url="https://yourdomain.com/pay/cancel"
- )
- payment.payment_intent_id = stripe_session.id
- payment.payment_url = stripe_session.url
- return payment
- @staticmethod
- def create_checkout_session(
- order,
- payment,
- success_url: str,
- cancel_url: str,
- ):
- """
- order.base_amount 单位:cent
- payment.amount 单位:cent
- """
-
- expires_at = int(time.time()) + 30 * 60 # Stripe 专用
- session = stripe.checkout.Session.create(
- mode="payment",
- payment_method_types=["card"],
- line_items=[
- {
- "price_data": {
- "currency": payment.currency.lower(),
- "product_data": {
- "name": f"Visa Service Order {order.id}",
- },
- "unit_amount": payment.amount,
- },
- "quantity": 1,
- }
- ],
- metadata={
- "order_id": order.id,
- "payment_id": payment.id,
- "user_id": order.user_id,
- },
- success_url=success_url + "?session_id={CHECKOUT_SESSION_ID}",
- cancel_url=cancel_url,
- expires_at=expires_at,
- )
- return session
- @staticmethod
- def _create_wechat_payment(db: Session, order: VasOrder):
- payment = VasPayment(
- order_id=order.id,
- provider="wechat",
- channel="qr_static",
- base_amount=order.base_amount,
- base_currency=order.base_currency,
- amount=0,
- currency="CNY",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- db.flush()
- return payment
-
- @staticmethod
- def _create_alipay_payment(db: Session, order: VasOrder):
- payment = VasPayment(
- order_id=order.id,
- provider="alipay",
- channel="qr_static",
- base_amount=order.base_amount,
- base_currency=order.base_currency,
- amount=0,
- currency="CNY",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- db.flush()
- return payment
-
- @staticmethod
- def _create_stripe_payment(db: Session, order: VasOrder):
- payment = VasPayment(
- order_id=order.id,
- provider="stripe",
- channel="online_link",
- base_amount=order.base_amount,
- base_currency=order.base_currency,
- amount=0,
- currency="EUR",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- db.flush()
- return payment
-
- @staticmethod
- def list_by_order(db: Session, order_id: str):
- payments = (
- db.query(VasPayment)
- .filter(
- VasPayment.order_id == order_id
- )
- .all()
- )
- return payments
|