import time from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from datetime import datetime, timedelta from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError from app.models.account import Account from app.schemas.account import AccountCreate, AccountUpdate class AccountService: @staticmethod async def add_account(db: AsyncSession, payload: AccountCreate): """ 添加新账号到数据库 """ rec = Account(**payload.dict()) db.add(rec) await db.commit() await db.refresh(rec) return rec @staticmethod async def update_account(db: AsyncSession, account_id: int, payload: AccountUpdate): stmt = select(Account).where(Account.id == account_id) rec = (await db.execute(stmt)).scalar_one_or_none() if not rec: raise NotFoundError("Account not exist") for k, v in payload.dict(exclude_unset=True).items(): setattr(rec, k, v) await db.commit() await db.refresh(rec) return rec @staticmethod async def remove_account(db: AsyncSession, account_id: int): stmt = select(Account).where(Account.id == account_id) db_obj = (await db.execute(stmt)).scalar_one_or_none() if not db_obj: raise NotFoundError(f"Account not exist") await db.delete(db_obj) await db.commit() return True @staticmethod async def list_all( db: AsyncSession, page: int = 0, size: int = 10, keyword: Optional[str] = None ): stmt = select(Account) stmt = apply_keyword_search_stmt( stmt=stmt, model=Account, keyword=keyword, fields=["id", "pool_name", "username", "password", "extra_data", "status"], ).order_by(Account.id.desc()) return await paginate(db, stmt, page, size) @staticmethod async def get_next_account( db: AsyncSession, pool_name: str, account_cd: int ) -> Account: now = datetime.utcnow() stmt = ( select(Account) .where( Account.pool_name == pool_name, Account.status == 'active', Account.next_use_time <= now ) .order_by(Account.next_use_time.asc()) .limit(1) .with_for_update(skip_locked=True) ) result = await db.execute(stmt) obj = result.scalar_one_or_none() if not obj: raise NotFoundError('Account not found') obj.next_use_time = now + timedelta(seconds=account_cd) await db.commit() await db.refresh(obj) return obj