proxy_service.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import time
  2. from datetime import datetime, timedelta
  3. from typing import Optional
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select, func, text
  6. from typing import List, Dict
  7. from app.utils.search import apply_keyword_search_stmt
  8. from app.utils.pagination import paginate
  9. from app.core.biz_exception import NotFoundError
  10. from app.models.proxy_pool import ProxyPool
  11. from app.schemas.proxy_pool import ProxyCreate, ProxyUpdate, ProxyOut
  12. class ProxyService:
  13. @staticmethod
  14. async def create_proxy(db: AsyncSession, payload: ProxyCreate):
  15. rec = ProxyPool(**payload.dict())
  16. db.add(rec)
  17. await db.commit()
  18. await db.refresh(rec)
  19. return rec
  20. @staticmethod
  21. async def update_proxy(db: AsyncSession, proxy_id: int, payload: ProxyUpdate):
  22. stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
  23. rec = (await db.execute(stmt)).scalar_one_or_none()
  24. if not rec:
  25. raise NotFoundError("Proxy not exist")
  26. for k, v in payload.dict(exclude_unset=True).items():
  27. setattr(rec, k, v)
  28. await db.commit()
  29. await db.refresh(rec)
  30. return rec
  31. @staticmethod
  32. async def remove_proxy(db: AsyncSession, proxy_id: int):
  33. stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
  34. db_obj = (await db.execute(stmt)).scalar_one_or_none()
  35. if not db_obj:
  36. raise NotFoundError(f"Proxy not exist")
  37. await db.delete(db_obj)
  38. await db.commit()
  39. return True
  40. @staticmethod
  41. async def list_proxy(db: AsyncSession, page: int, size: int, keyword: str = None):
  42. stmt = select(ProxyPool)
  43. query = apply_keyword_search_stmt(
  44. stmt=stmt,
  45. model=ProxyPool,
  46. keyword=keyword,
  47. fields=["id", "pool_name", "ip", "username", "password"],
  48. ).order_by(ProxyPool.created_at.desc())
  49. return await paginate(db, query, page, size)
  50. @staticmethod
  51. async def get_next_ip(db: AsyncSession, pools: list[str], proxy_cd: int):
  52. stmt = (
  53. select(ProxyPool)
  54. .where(
  55. ProxyPool.status == 'active',
  56. ProxyPool.pool_name.in_(pools),
  57. ProxyPool.next_use_time <= func.utc_timestamp()
  58. )
  59. .order_by(ProxyPool.next_use_time.asc())
  60. .limit(1)
  61. .with_for_update(skip_locked=True)
  62. )
  63. result = await db.execute(stmt)
  64. obj = result.scalar_one_or_none()
  65. if not obj:
  66. raise NotFoundError('Proxy not found')
  67. obj.next_use_time = func.utc_timestamp() + text(f"INTERVAL {proxy_cd} SECOND")
  68. await db.commit()
  69. await db.refresh(obj)
  70. return obj