# app/services/payment_service.py import time import stripe import random import uuid import json from typing import Dict, List, Optional, Any from redis.asyncio import Redis from decimal import Decimal, ROUND_HALF_UP from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, BizLogicError from app.models.user import VasUser from app.models.order import VasOrder from app.models.payment import VasPayment from app.models.ticket import VasTicket from app.models.payment_event import VasPaymentEvent from app.models.product_routing import VasProductRouting from app.models.verification_token import VasVerificationToken from app.models.payment_provider import VasPaymentProvider from app.models.payment_qr import VasPaymentQR from app.models.payment_confirmation import VasPaymentConfirmation from app.models.configuration import Configuration from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate from app.services.notification_service import NotificationService from app.services.webhook_service import WebhookService def convert_currency( amount: int, from_currency: str, to_currency: str, exchange_config: Dict[str, Any], ) -> Decimal: """ 根据配置进行货币转换。 amount: 最小单位(如分/cents) exchange_config: { "base": "CNY", "rates": { "CNY": 1.0, "USD": 7.25, ... } } 计算公式: Target = Source * (Rate_From / Rate_To) """ if from_currency == to_currency: return Decimal(amount) rates = exchange_config.get("rates", {}) # 校验货币是否支持 if from_currency not in rates: raise BizLogicError(f"Exchange rate for {from_currency} not found in configuration") if to_currency not in rates: raise BizLogicError(f"Exchange rate for {to_currency} not found in configuration") # 获取相对于 Base 的汇率 from_rate = Decimal(str(rates[from_currency])) to_rate = Decimal(str(rates[to_currency])) if to_rate == 0: raise BizLogicError(f"Invalid exchange rate for {to_currency} (0)") # 交叉汇率计算 converted = (Decimal(amount) * from_rate) / to_rate # 四舍五入保留整数(支付系统通常使用最小单位) return converted.quantize(Decimal("1"), rounding=ROUND_HALF_UP) class PaymentService: @staticmethod async def _get_exchange_config(db: AsyncSession) -> Dict: """ 内部辅助方法:从 Configuration 表读取汇率配置 """ stmt = select(Configuration).where(Configuration.config_key == "EXCHANGE_RATES") config_obj = (await db.execute(stmt)).scalar_one_or_none() if not config_obj: raise BizLogicError("System configuration 'EXCHANGE_RATES' is missing") # 处理 config_value,可能是字符串也可能是已转换的字典(取决于ORM配置) val = config_obj.config_value if isinstance(val, str): try: return json.loads(val) except json.JSONDecodeError: raise BizLogicError("Invalid JSON format in 'EXCHANGE_RATES'") elif isinstance(val, dict): return val else: raise BizLogicError("Invalid type for 'EXCHANGE_RATES' configuration") @staticmethod async def create_payment( db: AsyncSession, payload: VasPaymentCreate, redis_client: Redis ) -> VasPayment: """ 创建支付单的主入口 """ # ① 锁住订单(防并发) stmt = ( select(VasOrder) .where(VasOrder.id == payload.order_id) .with_for_update() ) order = (await db.execute(stmt)).scalar_one_or_none() if not order: raise NotFoundError("Order not found") # ② 是否已有 pending payment(幂等) stmt = select(VasPayment).where( VasPayment.order_id == order.id, VasPayment.status == "pending", ) active_payment = (await db.execute(stmt)).scalar_one_or_none() if active_payment: if active_payment.provider == payload.provider: return active_payment else: active_payment.status = "expired" # ③ 获取最新的汇率配置 exchange_config = await PaymentService._get_exchange_config(db) # ④ 根据 provider 创建 if payload.provider in ("wechat", "alipay"): payment = await PaymentService.create_offline_payment( db=db, order=order, provider_name=payload.provider, exchange_config=exchange_config, ) await db.commit() return payment if payload.provider == "stripe": payment = await PaymentService.create_stripe_payment( db=db, order=order, exchange_config=exchange_config, ) await db.commit() return payment raise BizLogicError("Unsupported provider") @staticmethod async def confirm_by_user( db: AsyncSession, payload: VasPaymentConfirmationCreate, current_user: VasUser ): """ 用户点击“我已支付” """ # 1️⃣ 查询是否存在对应 payment 确认记录 result = await db.execute( select(VasPaymentConfirmation) .where(VasPaymentConfirmation.payment_id == payload.payment_id) .where(VasPaymentConfirmation.user_id == current_user.id) ) record = result.scalar_one_or_none() if not record: # 没有则创建一条 pending -> confirmed 记录 record = VasPaymentConfirmation( payment_id=payload.payment_id, amount=payload.amount, currency=payload.currency, random_offset=payload.random_offset, user_id=current_user.id, status="pending", confirmed_at=payload.confirmed_at ) db.add(record) await db.commit() await db.refresh(record) stmt = select(VasPayment).where( VasPayment.id == payload.payment_id ) payment = (await db.execute(stmt)).scalar_one_or_none() formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)" # 2️⃣ 推送异步通知给管理员 await NotificationService.post_message( db=db, channel="wechat", payload={ "template_id": "payment_user_confirmed", "payload": { "order_id": payment.order_id, "payment_id": payload.payment_id, "user_email": current_user.email, "amount": payload.amount, "currency": payload.currency, "token": "", "confirmed_at": formatted_time, "provider": payment.provider } }, ) return record @staticmethod async def confirm_by_admin( db: AsyncSession, id: int, payload: VasPaymentConfirmationUpdate, current_user: VasUser ): """ 管理员确认用户的支付 """ # 1️⃣ 查询对应确认记录 result = await db.execute( select(VasPaymentConfirmation) .where(VasPaymentConfirmation.id == id) ) record = result.scalar_one_or_none() if not record: raise NotFoundError("Payment confirmation record not found") # 3️⃣ 更新管理员确认状态 record.admin_id = current_user.id record.admin_confirmed_at = datetime.utcnow() record.status = 'confirmed' await PaymentService._confirm_payment_action(db, record.payment_id, 'confirmed by admin') await db.commit() await db.refresh(record) return record @staticmethod async def admin_update_status( db: AsyncSession, payment_id: int, payload: AdminUpdateStatusPayload ): """ 管理员强制更新支付状态 """ if payload.status == "succeeded": payment = await PaymentService._confirm_payment_action(db, payment_id, payload.remark) else: pay_stmt = ( select(VasPayment) .where(VasPayment.id == payment_id) .order_by(VasPayment.created_at.desc()) ) pay_result = await db.execute(pay_stmt) payment = pay_result.scalar_one_or_none() if not payment: raise BizLogicError("Payment not found") payment.status = 'failed' await db.commit() await db.refresh(payment) await db.refresh(payment) return payment @staticmethod async def list_payment_confirmation( db: AsyncSession, keyword: Optional[str] = None, page: int = 1, size: int = 20, ): stmt = select(VasPaymentConfirmation) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasPaymentConfirmation, keyword=keyword, fields=["user_id"], ) stmt = stmt.order_by(VasPaymentConfirmation.id.desc()) return await paginate(db, stmt, page, size) @staticmethod async def confirm_payment( db: AsyncSession, payment_id: int, token: str ): # 校验验证码 stmt = select(VasVerificationToken).where( VasVerificationToken.token == token, VasVerificationToken.used == 0, ) token_obj = (await db.execute(stmt)).scalar_one_or_none() if not token_obj: raise BizLogicError("Token invalid") if token_obj.expire_at < datetime.utcnow(): raise BizLogicError("Token expired") payment = await PaymentService._confirm_payment_action(db, payment_id, 'confirmed by admin') token_obj.used = 1 await db.commit() return payment @staticmethod async def _confirm_payment_action(db: AsyncSession, payment_id: int, remark:str): # ---------- 查找 payment ---------- pay_stmt = ( select(VasPayment) .where(VasPayment.id == payment_id) .order_by(VasPayment.created_at.desc()) ) pay_result = await db.execute(pay_stmt) payment = pay_result.scalar_one_or_none() if not payment: raise BizLogicError("Payment not found") event = VasPaymentEvent( provider=payment.provider, event_type="payment_received", title='confirm payment', content='confirm payment by admin', parsed_amount=payment.amount, parsed_currency=payment.currency, parsed_device='', status="received", ) db.add(event) await db.commit() await db.refresh(event) if payment.status in ("succeeded", "late_paid"): event.status = "duplicate" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() raise BizLogicError("Payment has been confirmed") now = datetime.utcnow() payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded" payment.provider_payload = { "title": remark, "received_at": now.isoformat(), } order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id) order_result = await db.execute(order_stmt) order = order_result.scalar_one_or_none() if order and order.status != "paid": order.status = "paid" await WebhookService._create_task_if_not_exists(db, order) event.status = "applied" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() await db.refresh(payment) return payment @staticmethod async def create_offline_payment( db: AsyncSession, order: VasOrder, provider_name: str, exchange_config: Dict, ) -> VasPayment: payment = ( await PaymentService._create_wechat_payment(db, order) if provider_name == "wechat" else await PaymentService._create_alipay_payment(db, order) ) stmt = select(VasPaymentProvider).where( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == provider_name, ) provider = (await db.execute(stmt)).scalar_one_or_none() if not provider: raise BizLogicError("Payment provider not available") stmt = select(VasPaymentQR).where( VasPaymentQR.provider == provider_name, VasPaymentQR.is_active == 1, ) qrs = (await db.execute(stmt)).scalars().all() if not qrs: raise BizLogicError("No payment QR available") # 选择QR码也需要稳定吗?如果是,也可以用 rng.choice(qrs) # 这里暂时只改 random_offset qr = random.choice(qrs) payment.qr_id = qr.id # --- 汇率计算 --- amount_converted = convert_currency( amount=payment.final_amount, from_currency=order.base_currency, to_currency=provider.currency, exchange_config=exchange_config, ) rates = exchange_config.get("rates", {}) rate_from = Decimal(str(rates.get(order.base_currency, 1.0))) rate_to = Decimal(str(rates.get(provider.currency, 1.0))) if rate_to == 0: raise BizLogicError("Invalid target currency rate (0)") current_exchange_rate = rate_from / rate_to # --- 稳定的随机立减逻辑 --- # 规则:最大减免为金额的 1% 或 99分(取小值) max_discount = min(99, int(amount_converted * Decimal("0.01"))) if max_discount >= 1: # 关键修改:使用 order.id 作为种子 # 这确保了同一个订单号,无论计算多少次,得到的 discount 是一样的 rng = random.Random(order.id) discount = rng.randint(1, max_discount) else: discount = 0 payment.exchange_rate = current_exchange_rate payment.amount = int(amount_converted) - discount payment.currency = provider.currency payment.random_offset = discount return payment @staticmethod async def create_stripe_payment( db: AsyncSession, order: VasOrder, exchange_config: Dict, ) -> VasPayment: payment = await PaymentService._create_stripe_payment(db, order) stmt = select(VasPaymentProvider).where( VasPaymentProvider.enabled == 1, VasPaymentProvider.name == "stripe", ) provider = (await db.execute(stmt)).scalar_one_or_none() if not provider: raise BizLogicError("Stripe provider not enabled") amount_converted = convert_currency( amount=payment.final_amount, from_currency=order.base_currency, to_currency=provider.currency, exchange_config=exchange_config, ) rates = exchange_config.get("rates", {}) rate_from = Decimal(str(rates.get(order.base_currency, 1.0))) rate_to = Decimal(str(rates.get(provider.currency, 1.0))) if rate_to == 0: raise BizLogicError("Invalid target currency rate (0)") current_exchange_rate = rate_from / rate_to payment.exchange_rate = current_exchange_rate payment.amount = int(amount_converted) payment.currency = provider.currency payment.random_offset = 0 # Stripe 不需要随机减免 stripe_session = PaymentService.create_checkout_session( order=order, payment=payment, success_url="https://text.skin/dashboard", cancel_url="https://text.skin/dashboard", ) payment.payment_intent_id = stripe_session.id payment.payment_url = stripe_session.url return payment @staticmethod def create_checkout_session( order: VasOrder, payment: VasPayment, success_url: str, cancel_url: str, ): expires_at = int(time.time()) + 30 * 60 return stripe.checkout.Session.create( mode="payment", payment_method_types=["card"], line_items=[ { "price_data": { "currency": payment.currency.lower(), "product_data": { "name": f"Visa Service Order {order.id}", }, "unit_amount": payment.amount, }, "quantity": 1, } ], metadata={ "order_id": order.id, "payment_id": payment.id, "user_id": order.user_id, }, success_url=success_url, cancel_url=cancel_url, expires_at=expires_at, ) @staticmethod async def _create_wechat_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="wechat", channel="qr_static", base_amount=order.base_amount, adjustment_delta=order.adjustment_delta, final_amount=order.final_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def _create_alipay_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="alipay", channel="qr_static", base_amount=order.base_amount, adjustment_delta=order.adjustment_delta, final_amount=order.final_amount, base_currency=order.base_currency, amount=0, currency="CNY", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def _create_stripe_payment( db: AsyncSession, order: VasOrder, ) -> VasPayment: payment = VasPayment( order_id=order.id, provider="stripe", channel="online_link", base_amount=order.base_amount, adjustment_delta=order.adjustment_delta, final_amount=order.final_amount, base_currency=order.base_currency, amount=0, currency="EUR", random_offset=0, exchange_rate=0, status="pending", expire_at=datetime.utcnow() + timedelta(minutes=30), ) db.add(payment) await db.flush() return payment @staticmethod async def list_by_order( db: AsyncSession, order_id: int, ) -> List[VasPayment]: stmt = select(VasPayment).where( VasPayment.order_id == order_id ) result = await db.execute(stmt) return result.scalars().all() @staticmethod async def get_by_id( db: AsyncSession, id: int, ) -> VasPayment: stmt = select(VasPayment).where(VasPayment.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError(message="Payment not found") return obj