# app/services/payment_service.py import time import stripe import random import uuid from typing import Dict, List, Optional from redis.asyncio import Redis from decimal import Decimal, ROUND_HALF_UP from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, BizLogicError from app.models.user import VasUser from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.payment import VasPayment from app.models.ticket import VasTicket from app.models.payment_event import VasPaymentEvent from app.models.product_routing import VasProductRouting from app.models.verification_token import VasVerificationToken from app.models.payment_provider import VasPaymentProvider from app.models.payment_qr import VasPaymentQR from app.models.payment_confirmation import VasPaymentConfirmation from app.schemas.payment import VasPaymentCreate from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate from app.services.notification_service import NotificationService class PaymentService: # -------------------------------------------------- # 创建支付(统一入口) # -------------------------------------------------- @staticmethod async def create_payment( db: AsyncSession, payload: VasPaymentCreate, rate_table: Dict, redis_client: Redis ) -> VasPayment: # ① 锁住订单(防并发) stmt = ( select(VasOrder) .where(VasOrder.id == payload.order_id) .with_for_update() ) order = (await db.execute(stmt)).scalar_one_or_none() if not order: raise NotFoundError("Order not found") # ② 是否已有 pending payment(幂等) stmt = select(VasPayment).where( VasPayment.order_id == order.id, VasPayment.status == "pending", ) active_payment = (await db.execute(stmt)).scalar_one_or_none() if active_payment: if active_payment.provider == payload.provider: return active_payment else: active_payment.status = "failed" # ③ 根据 provider 创建 if payload.provider in ("wechat", "alipay"): payment = await PaymentService.create_offline_payment( db=db, order=order, provider_name=payload.provider, rate_table=rate_table, ) await db.commit() return payment if payload.provider == "stripe": payment = await PaymentService.create_stripe_payment( db=db, order=order, rate_table=rate_table, ) await db.commit() return payment raise BizLogicError("Unsupported provider") @staticmethod async def confirm_by_user( db: AsyncSession, payload: VasPaymentConfirmationCreate, current_user: VasUser, redis_client: Redis ): """ 用户点击“我已支付” """ # 1️⃣ 查询是否存在对应 payment 确认记录 result = await db.execute( select(VasPaymentConfirmation) .where(VasPaymentConfirmation.payment_id == payload.payment_id) .where(VasPaymentConfirmation.user_id == current_user.id) ) record = result.scalar_one_or_none() if not record: # 没有则创建一条 pending -> confirmed 记录 record = VasPaymentConfirmation( payment_id=payload.payment_id, amount=payload.amount, currency=payload.currency, random_offset=payload.random_offset, user_id=current_user.id, status="pending", confirmed_at=payload.confirmed_at ) db.add(record) await db.commit() await db.refresh(record) # 2️⃣ 推送异步通知给管理员 # await NotificationService.create( # redis_client=redis_client, # ntype="payment_user_confirmed", # user_id=current_user.id, # channels=["wechat"], # template_id="payment_user_confirmed", # payload={ # "payment_id": payload.payment_id, # "user_id": current_user.id, # "confirmed_at": record.confirmed_at.isoformat() # } # ) return record @staticmethod async def confirm_by_admin( db: AsyncSession, id: int, payload: VasPaymentConfirmationUpdate, current_user: VasUser ): """ 管理员确认用户的支付 """ # 1️⃣ 查询对应确认记录 result = await db.execute( select(VasPaymentConfirmation) .where(VasPaymentConfirmation.id == id) ) record = result.scalar_one_or_none() if not record: raise NotFoundError("Payment confirmation record not found") # 3️⃣ 更新管理员确认状态 record.admin_id = current_user.id record.admin_confirmed_at = datetime.utcnow() record.status = 'confirmed' await PaymentService._confirm_payment_action(db, record.payment_id) await db.commit() await db.refresh(record) return record @staticmethod async def list_payment_confirmation( db: AsyncSession, keyword: Optional[str] = None, page: int = 1, size: int = 20, ): stmt = select(VasPaymentConfirmation) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasPaymentConfirmation, keyword=keyword, fields=["user_id"], ) stmt = stmt.order_by(VasPaymentConfirmation.id.desc()) return await paginate(db, stmt, page, size) @staticmethod async def _create_task_if_not_exists( db: AsyncSession, order: VasOrder, ) -> List[VasTask]: stmt = select(VasProductRouting).where( VasProductRouting.product_id == order.product_id, VasProductRouting.is_active == 1, ) result = await db.execute(stmt) routings = result.scalars().all() if not routings: return [] created_tasks: List[VasTask] = [] for routing in routings: exists_stmt = select(VasTask).where( VasTask.order_id == order.id, VasTask.routing_key == routing.routing_key, VasTask.script_version == routing.script_version, ) exists_result = await db.execute(exists_stmt) exists = exists_result.scalar_one_or_none() if exists: continue task = VasTask( order_id=order.id, routing_key=routing.routing_key, script_version=routing.script_version, priority=routing.priority, status="pending", user_inputs=order.user_inputs, config=routing.config, attempt_count=0, notify_count=0, expire_at=datetime.utcnow() + timedelta(days=60), created_at=datetime.utcnow(), ) db.add(task) created_tasks.append(task) return created_tasks @staticmethod async def confirm_payment( db: AsyncSession, payment_id: int, token: str ): # 校验验证码 stmt = select(VasVerificationToken).where( VasVerificationToken.token == token, VasVerificationToken.used == 0, ) token_obj = (await db.execute(stmt)).scalar_one_or_none() if not token_obj: raise BizLogicError("Token invalid") if token_obj.expire_at < datetime.utcnow(): raise BizLogicError("Token expired") payment = await PaymentService._confirm_payment_action(db, payment_id) token_obj.used = 1 await db.commit() return payment @staticmethod async def _confirm_payment_action(db: AsyncSession, payment_id: int): # ---------- 查找 payment ---------- pay_stmt = ( select(VasPayment) .where( VasPayment.id == payment_id, VasPayment.status == "pending", ) .order_by(VasPayment.created_at.desc()) ) pay_result = await db.execute(pay_stmt) payment = pay_result.scalar_one_or_none() if not payment: raise BizLogicError("Payment not found") event = VasPaymentEvent( provider=payment.provider, event_type="payment_received", title='confirm payment', content='confirm payment by admin', parsed_amount=payment.amount, parsed_currency=payment.currency, parsed_device='', status="received", ) db.add(event) await db.commit() await db.refresh(event) if payment.status in ("succeeded", "late_paid"): event.status = "duplicate" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() return None now = datetime.utcnow() payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded" payment.provider_payload = { "title": "confirm by admin", "received_at": now.isoformat(), } order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id) order_result = await db.execute(order_stmt) order = order_result.scalar_one_or_none() if order and order.status != "paid": order.status = "paid" await PaymentService._create_task_if_not_exists(db, order) event.status = "applied" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() await db.refresh(payment) return payment @staticmethod async def create_offline_payment( db: AsyncSession, order: VasOrder, provider_name: str, rate_table: Dict, ) -> VasPayment: payment = ( await PaymentService._create_wechat_payment(db, order) if provider_name == "wechat" else await PaymentService._create_alipay_payment(db, order) ) stmt = select(VasPaymentProvider).where( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == provider_name, ) provider = (await db.execute(stmt)).scalar_one_or_none() if not provider: raise BizLogicError("Payment provider not available") stmt = select(VasPaymentQR).where( VasPaymentQR.provider == provider_name, VasPaymentQR.is_active == 1, ) qrs = (await db.execute(stmt)).scalars().all() if not qrs: raise BizLogicError("No payment QR available") qr = random.choice(qrs) payment.qr_id = qr.id rate_key = f"{order.base_currency}->{provider.currency}".upper() exchange_rate = Decimal(rate_table[rate_key]) converted = ( Decimal(payment.base_amount) * exchange_rate ).quantize(Decimal("1"), rounding=ROUND_HALF_UP) max_discount = min(99, int(converted * Decimal("0.01"))) discount = random.randint(1, max_discount) if max_discount >= 1 else 0 payment.exchange_rate = exchange_rate payment.amount = int(converted) - discount payment.currency = provider.currency payment.random_offset = discount return payment @staticmethod async def create_stripe_payment( db: AsyncSession, order: VasOrder, rate_table: Dict, ) -> VasPayment: payment = await PaymentService._create_stripe_payment(db, order) stmt = select(VasPaymentProvider).where( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == "stripe", ) provider = (await db.execute(stmt)).scalar_one_or_none() if not provider: raise BizLogicError("Stripe provider not enabled") rate_key = f"{order.base_currency}->{provider.currency}".upper() exchange_rate = Decimal(rate_table[rate_key]) converted = ( Decimal(payment.base_amount) * exchange_rate ).quantize(Decimal("1"), rounding=ROUND_HALF_UP) payment.exchange_rate = exchange_rate payment.amount = int(converted) payment.currency = provider.currency payment.random_offset = 0 stripe_session = PaymentService.create_checkout_session( order=order, payment=payment, success_url="https://visafly.top/dashboard", cancel_url="https://visafly.top/dashboard", ) payment.payment_intent_id = stripe_session.id payment.payment_url = stripe_session.url return payment @staticmethod def create_checkout_session( order: VasOrder, payment: VasPayment, success_url: str, cancel_url: str, ): expires_at = int(time.time()) + 30 * 60 return stripe.checkout.Session.create( mode="payment", payment_method_types=["card"], line_items=[ { "price_data": { "currency": payment.currency.lower(), "product_data": { "name": f"Visa Service Order {order.id}", }, "unit_amount": payment.amount, }, "quantity": 1, } ], metadata={ "order_id": order.id, "payment_id": payment.id, "user_id": order.user_id, }, success_url=success_url, cancel_url=cancel_url, expires_at=expires_at, ) @staticmethod async def _create_wechat_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="wechat", channel="qr_static", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def _create_alipay_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="alipay", channel="qr_static", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def _create_stripe_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="stripe", channel="online_link", base_amount=order.base_amount, base_currency=order.base_currency, amount=0, currency="EUR", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def list_by_order( db: AsyncSession, order_id: int, ) -> List[VasPayment]: stmt = select(VasPayment).where( VasPayment.order_id == order_id ) result = await db.execute(stmt) return result.scalars().all() @staticmethod async def get_by_id( db: AsyncSession, id: int, ) -> VasPayment: stmt = select(VasPayment).where(VasPayment.id == id) return (await db.execute(stmt)).scalar_one_or_none()