# app/services/payment_provider.py from sqlalchemy.orm import Session from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.payment_provider import VasPaymentProvider from app.schemas.payment_provider import VasPaymentProviderCreate, VasPaymentProviderUpdate class PaymentProviderSerivce: def create( db: Session, data: VasPaymentProviderCreate ) -> VasPaymentProvider: # 防止重复注册 exists = ( db.query(VasPaymentProvider) .filter( VasPaymentProvider.name == data.name, VasPaymentProvider.channel == data.channel, VasPaymentProvider.currency == data.currency, ) .first() ) if exists: raise BizLogicError("Payment provider already exists") provider = VasPaymentProvider( name=data.name, channel=data.channel, currency=data.currency, icon=data.icon, enabled=data.enabled if data.enabled is not None else 1, config=data.config, ) db.add(provider) db.commit() db.refresh(provider) return provider def update( db: Session, provider_id: int, data: VasPaymentProviderUpdate ) -> VasPaymentProvider: provider = db.query(VasPaymentProvider).get(provider_id) if not provider: raise BizLogicError("Payment provider not found") update_data = data.dict(exclude_unset=True) # 安全起见,禁止修改三元组 for forbidden in ("name", "channel", "currency"): update_data.pop(forbidden, None) for key, value in update_data.items(): setattr(provider, key, value) db.commit() db.refresh(provider) return provider def delete(db: Session, id: int): provider = db.query(VasPaymentProvider).filter_by(id=id).first() if not provider: raise NotFoundError("Provider not exist") db.delete(provider) db.commit() def list_all(db: Session): return db.query(VasPaymentProvider).all() def list_enabled( db: Session, currency: str = None ): q = db.query(VasPaymentProvider).filter( VasPaymentProvider.enabled == 1 ) if currency: q = q.filter(VasPaymentProvider.currency == currency) return q.all() def get_by_name( db: Session, name: str ): q = db.query(VasPaymentProvider).filter( VasPaymentProvider.enabled == 1 ) if name: q = q.filter(VasPaymentProvider.name == name) return q.first()