| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 |
- # app/services/payment_service.py
- import time
- import stripe
- import random
- import uuid
- import json
- from typing import Dict, List, Optional, Any
- from redis.asyncio import Redis
- from decimal import Decimal, ROUND_HALF_UP
- from datetime import datetime, timedelta
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.utils.search import apply_keyword_search_stmt
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError, BizLogicError
- from app.models.user import VasUser
- from app.models.order import VasOrder
- from app.models.payment import VasPayment
- from app.models.ticket import VasTicket
- from app.models.payment_event import VasPaymentEvent
- from app.models.product_routing import VasProductRouting
- from app.models.verification_token import VasVerificationToken
- from app.models.payment_provider import VasPaymentProvider
- from app.models.payment_qr import VasPaymentQR
- from app.models.payment_confirmation import VasPaymentConfirmation
- from app.models.configuration import Configuration
- from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload
- from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate
- from app.services.notification_service import NotificationService
- from app.services.webhook_service import WebhookService
- def convert_currency(
- amount: int,
- from_currency: str,
- to_currency: str,
- exchange_config: Dict[str, Any],
- ) -> Decimal:
- """
- 根据配置进行货币转换。
- amount: 最小单位(如分/cents)
- exchange_config: { "base": "CNY", "rates": { "CNY": 1.0, "USD": 7.25, ... } }
- 计算公式: Target = Source * (Rate_From / Rate_To)
- """
- if from_currency == to_currency:
- return Decimal(amount)
- rates = exchange_config.get("rates", {})
-
- # 校验货币是否支持
- if from_currency not in rates:
- raise BizLogicError(f"Exchange rate for {from_currency} not found in configuration")
- if to_currency not in rates:
- raise BizLogicError(f"Exchange rate for {to_currency} not found in configuration")
- # 获取相对于 Base 的汇率
- from_rate = Decimal(str(rates[from_currency]))
- to_rate = Decimal(str(rates[to_currency]))
- if to_rate == 0:
- raise BizLogicError(f"Invalid exchange rate for {to_currency} (0)")
- # 交叉汇率计算
- converted = (Decimal(amount) * from_rate) / to_rate
-
- # 四舍五入保留整数(支付系统通常使用最小单位)
- return converted.quantize(Decimal("1"), rounding=ROUND_HALF_UP)
- class PaymentService:
- @staticmethod
- async def _get_exchange_config(db: AsyncSession) -> Dict:
- """
- 内部辅助方法:从 Configuration 表读取汇率配置
- """
- stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES")
- config_obj = (await db.execute(stmt)).scalar_one_or_none()
-
- if not config_obj:
- raise BizLogicError("System configuration 'EXCHANGE_RATES' is missing")
-
- # 处理 config_value,可能是字符串也可能是已转换的字典(取决于ORM配置)
- val = config_obj.config_value
- if isinstance(val, str):
- try:
- return json.loads(val)
- except json.JSONDecodeError:
- raise BizLogicError("Invalid JSON format in 'EXCHANGE_RATES'")
- elif isinstance(val, dict):
- return val
- else:
- raise BizLogicError("Invalid type for 'EXCHANGE_RATES' configuration")
- @staticmethod
- async def create_payment(
- db: AsyncSession,
- payload: VasPaymentCreate,
- redis_client: Redis
- ) -> VasPayment:
- """
- 创建支付单的主入口
- """
- # ① 锁住订单(防并发)
- stmt = (
- select(VasOrder)
- .where(VasOrder.id == payload.order_id)
- .with_for_update()
- )
- order = (await db.execute(stmt)).scalar_one_or_none()
- if not order:
- raise NotFoundError("Order not found")
- # ② 是否已有 pending payment(幂等)
- stmt = select(VasPayment).where(
- VasPayment.order_id == order.id,
- VasPayment.status == "pending",
- )
- active_payment = (await db.execute(stmt)).scalar_one_or_none()
- if active_payment:
- if active_payment.provider == payload.provider:
- return active_payment
- else:
- active_payment.status = "expired"
- # ③ 获取最新的汇率配置
- exchange_config = await PaymentService._get_exchange_config(db)
- # ④ 根据 provider 创建
- if payload.provider in ("wechat", "alipay"):
- payment = await PaymentService.create_offline_payment(
- db=db,
- order=order,
- provider_name=payload.provider,
- exchange_config=exchange_config,
- )
- await db.commit()
- return payment
- if payload.provider == "stripe":
- payment = await PaymentService.create_stripe_payment(
- db=db,
- order=order,
- exchange_config=exchange_config,
- )
- await db.commit()
- return payment
- raise BizLogicError("Unsupported provider")
-
- @staticmethod
- async def confirm_by_user(
- db: AsyncSession,
- payload: VasPaymentConfirmationCreate,
- current_user: VasUser,
- redis_client: Redis
- ):
- """
- 用户点击“我已支付”
- """
- # 1️⃣ 查询是否存在对应 payment 确认记录
- result = await db.execute(
- select(VasPaymentConfirmation)
- .where(VasPaymentConfirmation.payment_id == payload.payment_id)
- .where(VasPaymentConfirmation.user_id == current_user.id)
- )
- record = result.scalar_one_or_none()
- if not record:
- # 没有则创建一条 pending -> confirmed 记录
- record = VasPaymentConfirmation(
- payment_id=payload.payment_id,
- amount=payload.amount,
- currency=payload.currency,
- random_offset=payload.random_offset,
- user_id=current_user.id,
- status="pending",
- confirmed_at=payload.confirmed_at
- )
- db.add(record)
- await db.commit()
- await db.refresh(record)
-
-
- stmt = select(VasPayment).where(
- VasPayment.id == payload.payment_id
- )
- payment = (await db.execute(stmt)).scalar_one_or_none()
- formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
- # 2️⃣ 推送异步通知给管理员
- await NotificationService.post_wechat(
- redis_client=redis_client,
- template_id="payment_user_confirmed",
- payload={
- "order_id": payment.order_id,
- "payment_id": payload.payment_id,
- "user_email": current_user.email,
- "amount": payload.amount,
- "currency": payload.currency,
- "token": "",
- "confirmed_at": formatted_time,
- "provider": payment.provider
- }
- )
- return record
-
- @staticmethod
- async def confirm_by_admin(
- db: AsyncSession,
- id: int,
- payload: VasPaymentConfirmationUpdate,
- current_user: VasUser
- ):
- """
- 管理员确认用户的支付
- """
- # 1️⃣ 查询对应确认记录
- result = await db.execute(
- select(VasPaymentConfirmation)
- .where(VasPaymentConfirmation.id == id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise NotFoundError("Payment confirmation record not found")
- # 3️⃣ 更新管理员确认状态
- record.admin_id = current_user.id
- record.admin_confirmed_at = datetime.utcnow()
- record.status = 'confirmed'
- await PaymentService._confirm_payment_action(db, record.payment_id, 'confirmed by admin')
-
- await db.commit()
- await db.refresh(record)
- return record
-
- @staticmethod
- async def admin_update_status(
- db: AsyncSession,
- payment_id: int,
- payload: AdminUpdateStatusPayload
- ):
- """
- 管理员强制更新支付状态
- """
- if payload.status == "succeeded":
- payment = await PaymentService._confirm_payment_action(db, payment_id, payload.remark)
- else:
- pay_stmt = (
- select(VasPayment)
- .where(VasPayment.id == payment_id)
- .order_by(VasPayment.created_at.desc())
- )
- pay_result = await db.execute(pay_stmt)
- payment = pay_result.scalar_one_or_none()
- if not payment:
- raise BizLogicError("Payment not found")
-
- payment.status = 'failed'
- await db.commit()
- await db.refresh(payment)
- await db.refresh(payment)
- return payment
-
- @staticmethod
- async def list_payment_confirmation(
- db: AsyncSession,
- keyword: Optional[str] = None,
- page: int = 1,
- size: int = 20,
- ):
- stmt = select(VasPaymentConfirmation)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasPaymentConfirmation,
- keyword=keyword,
- fields=["user_id"],
- )
- stmt = stmt.order_by(VasPaymentConfirmation.id.desc())
- return await paginate(db, stmt, page, size)
-
- @staticmethod
- async def confirm_payment(
- db: AsyncSession,
- payment_id: int,
- token: str
- ):
- # 校验验证码
- stmt = select(VasVerificationToken).where(
- VasVerificationToken.token == token,
- VasVerificationToken.used == 0,
- )
- token_obj = (await db.execute(stmt)).scalar_one_or_none()
- if not token_obj:
- raise BizLogicError("Token invalid")
- if token_obj.expire_at < datetime.utcnow():
- raise BizLogicError("Token expired")
-
- payment = await PaymentService._confirm_payment_action(db, payment_id, 'confirmed by admin')
- token_obj.used = 1
- await db.commit()
- return payment
-
- @staticmethod
- async def _confirm_payment_action(db: AsyncSession, payment_id: int, remark:str):
- # ---------- 查找 payment ----------
- pay_stmt = (
- select(VasPayment)
- .where(VasPayment.id == payment_id)
- .order_by(VasPayment.created_at.desc())
- )
- pay_result = await db.execute(pay_stmt)
- payment = pay_result.scalar_one_or_none()
- if not payment:
- raise BizLogicError("Payment not found")
-
- event = VasPaymentEvent(
- provider=payment.provider,
- event_type="payment_received",
- title='confirm payment',
- content='confirm payment by admin',
- parsed_amount=payment.amount,
- parsed_currency=payment.currency,
- parsed_device='',
- status="received",
- )
- db.add(event)
- await db.commit()
- await db.refresh(event)
- if payment.status in ("succeeded", "late_paid"):
- event.status = "duplicate"
- event.matched_payment_id = payment.id
- event.matched_order_id = payment.order_id
- await db.commit()
- raise BizLogicError("Payment has been confirmed")
- now = datetime.utcnow()
- payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
- payment.provider_payload = {
- "title": remark,
- "received_at": now.isoformat(),
- }
- order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id)
- order_result = await db.execute(order_stmt)
- order = order_result.scalar_one_or_none()
- if order and order.status != "paid":
- order.status = "paid"
-
- await WebhookService._create_task_if_not_exists(db, order)
- event.status = "applied"
- event.matched_payment_id = payment.id
- event.matched_order_id = payment.order_id
-
- await db.commit()
- await db.refresh(payment)
-
- return payment
- @staticmethod
- async def create_offline_payment(
- db: AsyncSession,
- order: VasOrder,
- provider_name: str,
- exchange_config: Dict,
- ) -> VasPayment:
- payment = (
- await PaymentService._create_wechat_payment(db, order)
- if provider_name == "wechat"
- else await PaymentService._create_alipay_payment(db, order)
- )
- stmt = select(VasPaymentProvider).where(
- VasPaymentProvider.enabled == 1,
- VasPaymentProvider.name == provider_name,
- )
- provider = (await db.execute(stmt)).scalar_one_or_none()
- if not provider:
- raise BizLogicError("Payment provider not available")
- stmt = select(VasPaymentQR).where(
- VasPaymentQR.provider == provider_name,
- VasPaymentQR.is_active == 1,
- )
- qrs = (await db.execute(stmt)).scalars().all()
- if not qrs:
- raise BizLogicError("No payment QR available")
- # 选择QR码也需要稳定吗?如果是,也可以用 rng.choice(qrs)
- # 这里暂时只改 random_offset
- qr = random.choice(qrs)
- payment.qr_id = qr.id
- # --- 汇率计算 ---
- amount_converted = convert_currency(
- amount=payment.final_amount,
- from_currency=order.base_currency,
- to_currency=provider.currency,
- exchange_config=exchange_config,
- )
-
- rates = exchange_config.get("rates", {})
- rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
- rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
-
- if rate_to == 0:
- raise BizLogicError("Invalid target currency rate (0)")
-
- current_exchange_rate = rate_from / rate_to
- # --- 稳定的随机立减逻辑 ---
- # 规则:最大减免为金额的 1% 或 99分(取小值)
- max_discount = min(99, int(amount_converted * Decimal("0.01")))
-
- if max_discount >= 1:
- # 关键修改:使用 order.id 作为种子
- # 这确保了同一个订单号,无论计算多少次,得到的 discount 是一样的
- rng = random.Random(order.id)
- discount = rng.randint(1, max_discount)
- else:
- discount = 0
- payment.exchange_rate = current_exchange_rate
- payment.amount = int(amount_converted) - discount
- payment.currency = provider.currency
- payment.random_offset = discount
- return payment
- @staticmethod
- async def create_stripe_payment(
- db: AsyncSession,
- order: VasOrder,
- exchange_config: Dict,
- ) -> VasPayment:
- payment = await PaymentService._create_stripe_payment(db, order)
- stmt = select(VasPaymentProvider).where(
- VasPaymentProvider.enabled == 1,
- VasPaymentProvider.name == "stripe",
- )
- provider = (await db.execute(stmt)).scalar_one_or_none()
- if not provider:
- raise BizLogicError("Stripe provider not enabled")
- amount_converted = convert_currency(
- amount=payment.final_amount,
- from_currency=order.base_currency,
- to_currency=provider.currency,
- exchange_config=exchange_config,
- )
- rates = exchange_config.get("rates", {})
- rate_from = Decimal(str(rates.get(order.base_currency, 1.0)))
- rate_to = Decimal(str(rates.get(provider.currency, 1.0)))
-
- if rate_to == 0:
- raise BizLogicError("Invalid target currency rate (0)")
-
- current_exchange_rate = rate_from / rate_to
- payment.exchange_rate = current_exchange_rate
- payment.amount = int(amount_converted)
- payment.currency = provider.currency
- payment.random_offset = 0 # Stripe 不需要随机减免
- stripe_session = PaymentService.create_checkout_session(
- order=order,
- payment=payment,
- success_url="https://visafly.top/dashboard",
- cancel_url="https://visafly.top/dashboard",
- )
- payment.payment_intent_id = stripe_session.id
- payment.payment_url = stripe_session.url
- return payment
- @staticmethod
- def create_checkout_session(
- order: VasOrder,
- payment: VasPayment,
- success_url: str,
- cancel_url: str,
- ):
- expires_at = int(time.time()) + 30 * 60
- return stripe.checkout.Session.create(
- mode="payment",
- payment_method_types=["card"],
- line_items=[
- {
- "price_data": {
- "currency": payment.currency.lower(),
- "product_data": {
- "name": f"Visa Service Order {order.id}",
- },
- "unit_amount": payment.amount,
- },
- "quantity": 1,
- }
- ],
- metadata={
- "order_id": order.id,
- "payment_id": payment.id,
- "user_id": order.user_id,
- },
- success_url=success_url,
- cancel_url=cancel_url,
- expires_at=expires_at,
- )
- @staticmethod
- async def _create_wechat_payment(
- db: AsyncSession,
- order: VasOrder,
- ) -> VasPayment:
- payment = VasPayment(
- order_id=order.id,
- provider="wechat",
- channel="qr_static",
-
- base_amount=order.base_amount,
- adjustment_delta=order.adjustment_delta,
- final_amount=order.final_amount,
- base_currency=order.base_currency,
-
- amount=0,
- currency="CNY",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- await db.flush()
- return payment
- @staticmethod
- async def _create_alipay_payment(
- db: AsyncSession,
- order: VasOrder,
- ) -> VasPayment:
- payment = VasPayment(
- order_id=order.id,
- provider="alipay",
- channel="qr_static",
-
- base_amount=order.base_amount,
- adjustment_delta=order.adjustment_delta,
- final_amount=order.final_amount,
- base_currency=order.base_currency,
-
- amount=0,
- currency="CNY",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- await db.flush()
- return payment
- @staticmethod
- async def _create_stripe_payment(
- db: AsyncSession,
- order: VasOrder,
- ) -> VasPayment:
- payment = VasPayment(
- order_id=order.id,
- provider="stripe",
- channel="online_link",
-
- base_amount=order.base_amount,
- adjustment_delta=order.adjustment_delta,
- final_amount=order.final_amount,
- base_currency=order.base_currency,
-
- amount=0,
- currency="EUR",
- random_offset=0,
- exchange_rate=0,
- status="pending",
- expire_at=datetime.utcnow() + timedelta(minutes=30),
- )
- db.add(payment)
- await db.flush()
- return payment
- @staticmethod
- async def list_by_order(
- db: AsyncSession,
- order_id: int,
- ) -> List[VasPayment]:
- stmt = select(VasPayment).where(
- VasPayment.order_id == order_id
- )
- result = await db.execute(stmt)
- return result.scalars().all()
-
- @staticmethod
- async def get_by_id(
- db: AsyncSession,
- id: int,
- ) -> VasPayment:
- stmt = select(VasPayment).where(VasPayment.id == id)
- return (await db.execute(stmt)).scalar_one_or_none()
|