| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import time
- from datetime import datetime, timedelta
- from typing import Optional
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from typing import List, Dict
- from app.utils.search import apply_keyword_search_stmt
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError
- from app.models.proxy_pool import ProxyPool
- from app.schemas.proxy_pool import ProxyCreate, ProxyUpdate, ProxyOut
- class ProxyService:
-
- @staticmethod
- async def create_proxy(db: AsyncSession, payload: ProxyCreate):
- rec = ProxyPool(**payload.dict())
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def update_proxy(db: AsyncSession, proxy_id: int, payload: ProxyUpdate):
- stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
- rec = (await db.execute(stmt)).scalar_one_or_none()
- if not rec:
- raise NotFoundError("Proxy not exist")
- for k, v in payload.dict(exclude_unset=True).items():
- setattr(rec, k, v)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def remove_proxy(db: AsyncSession, proxy_id: int):
- stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
- db_obj = (await db.execute(stmt)).scalar_one_or_none()
- if not db_obj:
- raise NotFoundError(f"Proxy not exist")
- await db.delete(db_obj)
- await db.commit()
- return True
- @staticmethod
- async def list_proxy(db: AsyncSession, page: int, size: int, keyword: str = None):
- stmt = select(ProxyPool)
- query = apply_keyword_search_stmt(
- stmt=stmt,
- model=ProxyPool,
- keyword=keyword,
- fields=["id", "pool_name", "ip", "username", "password"],
- ).order_by(ProxyPool.created_at.desc())
- return await paginate(db, query, page, size)
- @staticmethod
- async def get_next_ip(db: AsyncSession, pools: list[str], proxy_cd: int):
- now = datetime.utcnow()
- stmt = (
- select(ProxyPool)
- .where(
- ProxyPool.status == 'active',
- ProxyPool.pool_name.in_(pools),
- ProxyPool.next_use_time <= now
- )
- .order_by(ProxyPool.next_use_time.asc())
- .limit(1)
- .with_for_update(skip_locked=True)
- )
- result = await db.execute(stmt)
- obj = result.scalar_one_or_none()
- if not obj:
- raise NotFoundError('Proxy not found')
- obj.next_use_time = now + timedelta(seconds=proxy_cd)
- await db.commit()
- await db.refresh(obj)
- return obj
|