import time from datetime import datetime, timedelta from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, text from typing import List, Dict from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError from app.models.proxy_pool import ProxyPool from app.schemas.proxy_pool import ProxyCreate, ProxyUpdate, ProxyOut class ProxyService: @staticmethod async def create_proxy(db: AsyncSession, payload: ProxyCreate): rec = ProxyPool(**payload.dict()) db.add(rec) await db.commit() await db.refresh(rec) return rec @staticmethod async def update_proxy(db: AsyncSession, proxy_id: int, payload: ProxyUpdate): stmt = select(ProxyPool).where(ProxyPool.id == proxy_id) rec = (await db.execute(stmt)).scalar_one_or_none() if not rec: raise NotFoundError("Proxy not exist") for k, v in payload.dict(exclude_unset=True).items(): setattr(rec, k, v) await db.commit() await db.refresh(rec) return rec @staticmethod async def remove_proxy(db: AsyncSession, proxy_id: int): stmt = select(ProxyPool).where(ProxyPool.id == proxy_id) db_obj = (await db.execute(stmt)).scalar_one_or_none() if not db_obj: raise NotFoundError(f"Proxy not exist") await db.delete(db_obj) await db.commit() return True @staticmethod async def list_proxy(db: AsyncSession, page: int, size: int, keyword: str = None): stmt = select(ProxyPool) query = apply_keyword_search_stmt( stmt=stmt, model=ProxyPool, keyword=keyword, fields=["id", "pool_name", "ip", "username", "password"], ).order_by(ProxyPool.created_at.desc()) return await paginate(db, query, page, size) @staticmethod async def get_next_ip(db: AsyncSession, pools: list[str], proxy_cd: int): stmt = ( select(ProxyPool) .where( ProxyPool.status == 'active', ProxyPool.pool_name.in_(pools), ProxyPool.next_use_time <= func.utc_timestamp() ) .order_by(ProxyPool.next_use_time.asc()) .limit(1) .with_for_update(skip_locked=True) ) result = await db.execute(stmt) obj = result.scalar_one_or_none() if not obj: raise NotFoundError('Proxy not found') obj.next_use_time = func.utc_timestamp() + text(f"INTERVAL {proxy_cd} SECOND") await db.commit() await db.refresh(obj) return obj