# app/services/payment_service.py import time import stripe import random from typing import List,Dict from decimal import Decimal, ROUND_HALF_UP from datetime import datetime, timedelta from sqlalchemy.orm import Session from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.order import VasOrder from app.models.product import VasProduct from app.models.payment import VasPayment from app.models.payment_provider import VasPaymentProvider from app.models.payment_qr import VasPaymentQR from app.schemas.payment import VasPaymentCreate class PaymentService: @staticmethod def create_payment(db: Session, payload: VasPaymentCreate, rate_table: Dict): # ① 锁住订单,防止并发创建 payment order = ( db.query(VasOrder) .filter(VasOrder.id == payload.order_id) .with_for_update() .one() ) # ② 是否已有进行中的 payment active_payment = ( db.query(VasPayment) .filter( VasPayment.order_id == order.id, VasPayment.status == "pending" ) .first() ) if active_payment: if active_payment.provider == payload.provider: return active_payment # 直接返回旧的,不报错(幂等性) else: active_payment.status = 'failed' db.add(active_payment) if payload.provider in ("wechat", "alipay"): payment = PaymentService.create_offline_payment(db=db, order=order, provider_name=payload.provider, rate_table=rate_table) db.commit() return payment if payload.provider == "stripe": payment = PaymentService.create_stripe_payment(db=db, order=order, rate_table=rate_table) db.commit() return payment raise BizLogicError("Unsupported provider") @staticmethod def create_offline_payment(db, order, provider_name: str, rate_table: dict): payment = ( PaymentService._create_wechat_payment(db, order) if provider_name == "wechat" else PaymentService._create_alipay_payment(db, order) ) provider = db.query(VasPaymentProvider).filter( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == provider_name ).first() qrs = db.query(VasPaymentQR).filter(VasPaymentQR.provider == provider_name).all() if not qrs: raise BizLogicError("No payment QR available") qr = random.choice(qrs) payment.qr_id = qr.id rate_key = f"{order.base_currency}->{provider.currency}".upper() exchange_rate = Decimal(rate_table[rate_key]) converted = ( Decimal(payment.base_amount) * exchange_rate ).quantize(Decimal("1"), rounding=ROUND_HALF_UP) max_discount = min(99, int(converted * Decimal("0.01"))) discount = random.randint(1, max_discount) if max_discount >= 1 else 0 final_amount = int(converted) - discount payment.exchange_rate = exchange_rate payment.amount = final_amount payment.currency = provider.currency payment.random_offset = discount return payment @staticmethod def create_stripe_payment(db, order, rate_table: dict): payment = PaymentService._create_stripe_payment(db, order) provider = db.query(VasPaymentProvider).filter( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == 'stripe' ).first() rate_key = f"{order.base_currency}->{provider.currency}".upper() exchange_rate = Decimal(rate_table[rate_key]) converted = ( Decimal(payment.base_amount) * exchange_rate ).quantize(Decimal("1"), rounding=ROUND_HALF_UP) payment.exchange_rate = exchange_rate payment.amount = int(converted) payment.currency = provider.currency payment.random_offset = 0 stripe_session = PaymentService.create_checkout_session( order=order, payment=payment, success_url="https://yourdomain.com/pay/success", cancel_url="https://yourdomain.com/pay/cancel" ) payment.payment_intent_id = stripe_session.id payment.payment_url = stripe_session.url return payment @staticmethod def create_checkout_session( order, payment, success_url: str, cancel_url: str, ): """ order.base_amount 单位:cent payment.amount 单位:cent """ expires_at = int(time.time()) + 30 * 60 # Stripe 专用 session = stripe.checkout.Session.create( mode="payment", payment_method_types=["card"], line_items=[ { "price_data": { "currency": payment.currency.lower(), "product_data": { "name": f"Visa Service Order {order.id}", }, "unit_amount": payment.amount, }, "quantity": 1, } ], metadata={ "order_id": order.id, "payment_id": payment.id, "user_id": order.user_id, }, success_url=success_url + "?session_id={CHECKOUT_SESSION_ID}", cancel_url=cancel_url, expires_at=expires_at, ) return session @staticmethod def _create_wechat_payment(db: Session, order: VasOrder): payment = VasPayment( order_id=order.id, provider="wechat", channel="qr_static", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) db.flush() return payment @staticmethod def _create_alipay_payment(db: Session, order: VasOrder): payment = VasPayment( order_id=order.id, provider="alipay", channel="qr_static", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) db.flush() return payment @staticmethod def _create_stripe_payment(db: Session, order: VasOrder): payment = VasPayment( order_id=order.id, provider="stripe", channel="online_link", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="EUR", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) db.flush() return payment @staticmethod def list_by_order(db: Session, order_id: str): payments = ( db.query(VasPayment) .filter( VasPayment.order_id == order_id ) .all() ) return payments