jerry 3 nedēļas atpakaļ
vecāks
revīzija
5cb4ff87e0

+ 63 - 17
app/api/router.py

@@ -53,7 +53,8 @@ from app.schemas.notification_outbox import NotificationOutboxCreate, Notificati
 from app.schemas.resource import FileUploadOut
 from app.schemas.statistics import VasStatisticsOverviewOut
 from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
-from app.schemas.account import AccountResponse, AccountCreate, LockRequest
+from app.schemas.account import AccountOut, AccountCreate, AccountUpdate, GetNextAccountPayload
+from app.schemas.proxy_pool import ProxyCreate, ProxyUpdate, ProxyOut, GetNextIpPayload
 from app.schemas.docker_remote import RemoteServerConfig, DockerStatusOut, DockerLogsRequest, DockerLogsOut, ConfigReadOut, ConfigReadRequest, ConfigUpdateRequest, LogReadRequest, LogReadOut, LogListOut, DockerContainerStatus, DockerActionRequest, ServerConfigItem, ServerListOut, RemoteActionRequest
 from app.schemas.order_event import VasOrderEventCreate, VasOrderEventOut
 from app.schemas.troov_session import TroovSessionCreate, TroovSessionUpdate, TroovSessionOut
@@ -94,6 +95,7 @@ from app.services.slot_refresh_status_service import SlotRefreshStatusService
 from app.services.account_service import AccountService
 from app.services.order_event_service import OrderEventService
 from app.services.troov_session_service import TroovSessionService
+from app.services.proxy_service import ProxyService
 
 # 公共路由
 public_router = APIRouter()
@@ -592,8 +594,8 @@ async def emails_get_max_uid(
     uid = await EmailsService.get_max_uid(db)
     return success(data=uid)
 
-@admin_required_router.get("/account/list_all", summary="分页查询账号", tags=["账号管理"], response_model=ApiResponse[PageResponse[AccountResponse]])
-async def account_next(
+@admin_required_router.get("/account/list_all", summary="分页查询账号", tags=["账号管理"], response_model=ApiResponse[PageResponse[AccountOut]])
+async def account_list(
     page: int = Query(0, description="第几页"),
     size: int = Query(10, description="分页大小"),
     keyword: str = Query("", description="查询条件"),
@@ -602,39 +604,83 @@ async def account_next(
     obj = await AccountService.list_all(db, page, size, keyword)
     return success(data=obj)
 
-@admin_required_router.get("/account/next", summary="获取下一个账号", tags=["账号管理"], response_model=ApiResponse[AccountResponse])
+@admin_required_router.post("/account/next", summary="获取下一个账号", tags=["账号管理"], response_model=ApiResponse[AccountOut])
 async def account_next(
-    pool_name: str, 
-    lock_duration: float = 60.0, 
+    payload: GetNextAccountPayload, 
     db: AsyncSession = Depends(get_db)
 ):
-    account = await AccountService.get_next_account(db, pool_name, lock_duration)
+    account = await AccountService.get_next_account(db, payload.pool_name, payload.account_cd)
     return success(data=account)
 
-@admin_required_router.post("/account/add", summary="新增一个账号", tags=["账号管理"], response_model=ApiResponse[AccountResponse])
+@admin_required_router.post("/account/add", summary="新增一个账号", tags=["账号管理"], response_model=ApiResponse[AccountOut])
 async def account_add(
     payload: AccountCreate,
     db: AsyncSession = Depends(get_db)
 ):
     account = await AccountService.add_account(db, payload)
     return success(data=account)
-   
-@admin_required_router.post("/account/lock", summary="锁定账号", tags=["账号管理"], response_model=ApiResponse)
-async def account_lock(
-    payload: LockRequest,
+
+@admin_required_router.delete("/account/delete", summary="删除账号", tags=["账号管理"], response_model=ApiResponse)
+async def account_remove(
+    account_id: int,
     db: AsyncSession = Depends(get_db)
 ):
-    await AccountService.manual_lock(db, payload)
+    await AccountService.remove_account(db, account_id)
     return success()
 
-@admin_required_router.post("/account/disable", summary="禁用账号", tags=["账号管理"], response_model=ApiResponse)
-async def account_disable(
-    payload: LockRequest,
+@admin_required_router.put("/account/update", summary="更新账号", tags=["账号管理"], response_model=ApiResponse[ProxyOut])
+async def account_update(
+    account_id: int,
+    payload: AccountUpdate,
+    db: AsyncSession = Depends(get_db)
+):
+    account = await AccountService.update_acccount(db, account_id, payload)
+    return success(data=account)
+   
+
+@admin_required_router.post("/proxy/create", summary="创建代理", tags=["代理管理"], response_model=ApiResponse[ProxyOut])
+async def proxy_create(
+    payload: ProxyCreate,
+    db: AsyncSession = Depends(get_db)
+):
+    proxy = await ProxyService.create_proxy(db, payload)
+    return success(data=proxy)
+
+@admin_required_router.put("/proxy/update", summary="更新代理", tags=["代理管理"], response_model=ApiResponse[ProxyOut])
+async def proxy_update(
+    proxy_id: int,
+    payload: ProxyUpdate,
+    db: AsyncSession = Depends(get_db)
+):
+    proxy = await ProxyService.update_proxy(db, proxy_id, payload)
+    return success(data=proxy)
+
+@admin_required_router.delete("/proxy/delete", summary="删除代理", tags=["代理管理"], response_model=ApiResponse)
+async def proxy_remove(
+    proxy_id: int,
     db: AsyncSession = Depends(get_db)
 ):
-    await AccountService.disable_account(db, payload)
+    await ProxyService.remove_proxy(db, proxy_id)
     return success()
 
+@admin_required_router.get("/proxy/list_all", summary="查询代理", tags=["代理管理"], response_model=ApiResponse[PageResponse[ProxyOut]])
+async def proxy_list(
+    page: int = Query(0, description="第几页"),
+    size: int = Query(10, description="分页大小"),
+    keyword: str = Query("", description="查询条件"),
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await ProxyService.list_all(db, page, size, keyword)
+    return success(data=obj)
+
+@admin_required_router.post("/proxy/next-ip", summary="获取下一个ip", tags=["代理管理"], response_model=ApiResponse[ProxyOut])
+async def proxy_next_ip_get(
+    payload: GetNextIpPayload,
+    db: AsyncSession = Depends(get_db)
+):
+    proxy = await ProxyService.get_next_ip(db, payload.pools, payload.proxy_cd)
+    return success(data=proxy)
+
 @public_router.post("/resource/upload_file", summary="上传文件", tags=["文件管理"], response_model=ApiResponse[FileUploadOut])
 async def resource_upload_file(file: UploadFile = File(...)):
     result = await SeaweedFSService.upload(file)

+ 6 - 7
app/models/account.py

@@ -1,6 +1,6 @@
-from sqlalchemy import Column, Integer, String, Text, Float, JSON, TIMESTAMP, Enum
+from sqlalchemy import Column, Integer, String, Text, Float, JSON, TIMESTAMP, Enum, DateTime
 from sqlalchemy.sql import func
-from sqlalchemy.ext.declarative import declarative_base
+from datetime import datetime
 from app.core.database import Base
 
 
@@ -12,9 +12,8 @@ class Account(Base):
     username = Column(String(100), nullable=False)
     password = Column(String(255), nullable=True)
     extra_data = Column(JSON, nullable=True)
-    # 锁定截止时间戳 (0 表示未锁定,> time.time() 表示锁定中)
-    lock_until = Column(Float, default=0, index=True)
+    next_use_time = Column(DateTime, nullable=False, default=func.now(), comment="下次允许使用的时间")
+    status = Column(Enum('active','disable'), default="active", index=True)
     
-    # 状态: active, disabled, removed
-    status = Column(
-        Enum('active','disable'), default="active", index=True)
+    created_at = Column(DateTime, default=datetime.utcnow)
+    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

+ 20 - 0
app/models/proxy_pool.py

@@ -0,0 +1,20 @@
+from sqlalchemy import Column, Integer, String, DateTime
+from sqlalchemy.sql import func
+from datetime import datetime
+from app.core.database import Base
+
+class ProxyPool(Base):
+    __tablename__ = "proxy_pool"
+
+    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
+    pool_name = Column(String(32), nullable=False, comment="所属代理池")
+    proto = Column(String(10), nullable=False, default="http", comment="代理协议")
+    ip = Column(String(39), nullable=False, comment="代理IP")
+    port = Column(Integer, nullable=False, comment="端口")
+    username = Column(String(64), default=None, comment="代理用户名")
+    password = Column(String(64), default=None, comment="代理密码")
+    next_use_time = Column(DateTime, nullable=False, default=func.now(), comment="下次允许使用的时间")
+    status = Column(String(16), nullable=False, default="active", comment="active=可用, disable=禁用")
+    
+    created_at = Column(DateTime, default=datetime.utcnow)
+    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

+ 20 - 5
app/schemas/account.py

@@ -1,6 +1,7 @@
 import json
 from pydantic import BaseModel, field_validator
 from typing import Optional, Dict, Any, Literal
+from datetime import datetime
 
 class AccountBase(BaseModel):
     pool_name: str
@@ -8,7 +9,7 @@ class AccountBase(BaseModel):
     password: Optional[str] = None
     extra_data: Optional[Dict[str, Any]] = None
     status: Optional[Literal['active','disable']] = None
-    lock_until: float = 0
+    next_use_time: Optional[datetime] = None
     @field_validator("extra_data", mode="before")
     def normalize_json_field(cls, v):
         if v is None:
@@ -24,14 +25,28 @@ class AccountBase(BaseModel):
 class AccountCreate(AccountBase):
     pass
 
-class AccountResponse(AccountBase):
+class AccountUpdate(AccountBase):
+    """
+    更新代理 Payload
+    注意:因为是更新,前端可能只传部分字段,所以这里全部重新定义为 Optional
+    """
+    pool_name: Optional[str] = None
+    username: Optional[str] = None
+    password: Optional[str] = None
+    extra_data: Optional[Dict[str, Any]] = None
+    next_use_time: Optional[datetime] = None
+    status: Optional[str] = None
+
+class AccountOut(AccountBase):
     id: int
+    created_at: datetime
+    updated_at: datetime
     model_config = {
         "from_attributes": True
     }
 
-class LockRequest(BaseModel):
+class GetNextAccountPayload(BaseModel):
+    """获取下个IP的请求参数"""
     pool_name: str
-    username: str
-    duration: Optional[int] = None
+    account_cd: int
     

+ 51 - 0
app/schemas/proxy_pool.py

@@ -0,0 +1,51 @@
+from pydantic import BaseModel, Field
+from typing import Optional, List
+from datetime import datetime
+
+# ================= Base Schema =================
+class ProxyBase(BaseModel):
+    """代理基础共享属性,定义核心字段及校验规则"""
+    pool_name: str
+    proto: str
+    ip: str
+    port: int
+    username: Optional[str] = None
+    password: Optional[str] = None
+    next_use_time: Optional[datetime] = None
+    status: Optional[str] = None
+
+# ================= Request Schemas =================
+class ProxyCreate(ProxyBase):
+    """创建代理 Payload (直接继承 Base,复用所有必填规则)"""
+    pass
+
+class ProxyUpdate(ProxyBase):
+    """
+    更新代理 Payload
+    注意:因为是更新,前端可能只传部分字段,所以这里全部重新定义为 Optional
+    """
+    pool_name: Optional[str] = None
+    proto: Optional[str] = None
+    ip: Optional[str] = None
+    port: Optional[int] = None
+    username: Optional[str] = None
+    password: Optional[str] = None
+    next_use_time: Optional[datetime] = None
+    status: Optional[str] = None
+
+class GetNextIpPayload(BaseModel):
+    """获取下个IP的请求参数"""
+    pools: List[str]
+    proxy_cd: int
+
+
+# ================= Response Schemas =================
+class ProxyOut(ProxyBase):
+    """响应返回给前端的代理实体模型 (继承 Base,增加数据库自动生成的字段)"""
+    id: int
+    created_at: datetime
+    updated_at: datetime
+
+    model_config = {
+        "from_attributes": True
+    }

+ 56 - 76
app/services/account_service.py

@@ -2,15 +2,50 @@ import time
 from typing import Optional
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select, delete
-
+from datetime import datetime, timedelta
 from app.utils.search import apply_keyword_search_stmt
 from app.utils.pagination import paginate
 from app.core.biz_exception import NotFoundError
 from app.models.account import Account
-from app.schemas.account import AccountCreate, LockRequest
+from app.schemas.account import AccountCreate, AccountUpdate
 
 class AccountService:
     
+    @staticmethod
+    async def add_account(db: AsyncSession, payload: AccountCreate):
+        """
+        添加新账号到数据库
+        """
+        rec = Account(**payload.dict())
+        db.add(rec)
+        await db.commit()
+        await db.refresh(rec)
+        return rec
+
+    @staticmethod
+    async def update_account(db: AsyncSession, account_id: int, payload: AccountUpdate):
+        stmt = select(Account).where(Account.id == account_id)
+        rec = (await db.execute(stmt)).scalar_one_or_none()
+        if not rec:
+            raise NotFoundError("Account not exist")
+
+        for k, v in payload.dict(exclude_unset=True).items():
+            setattr(rec, k, v)
+
+        await db.commit()
+        await db.refresh(rec)
+        return rec
+    
+    @staticmethod
+    async def remove_account(db: AsyncSession, account_id: int):
+        stmt = select(Account).where(Account.id == account_id)
+        db_obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not db_obj:
+            raise NotFoundError(f"Account not exist")
+        await db.delete(db_obj)
+        await db.commit()
+        return True
+    
     @staticmethod
     async def list_all(
         db: AsyncSession,
@@ -28,83 +63,28 @@ class AccountService:
         return await paginate(db, stmt, page, size)
     
     @staticmethod
-    async def add_account(db: AsyncSession, payload: AccountCreate):
-        """
-        添加新账号到数据库
-        """
-        # 检查是否存在
-        stmt = select(Account).where(
-            Account.pool_name == payload.pool_name,
-            Account.username == payload.username,
-        )
-        obj = (await db.execute(stmt)).scalar_one_or_none()
-        
-        if obj:
-            # 如果存在,更新密码,并重置为 active
-            obj.password = payload.password
-            obj.status = "active"
-            if payload.extra_data:
-                obj.extra_data = payload.extra_data
-            await db.commit()
-            await db.refresh(obj)
-            return obj
-        else:
-            new_acc = Account(
-                pool_name=payload.pool_name,
-                username=payload.username, 
-                password=payload.password,
-                extra_data=payload.extra_data
+    async def get_next_account(
+        db: AsyncSession, 
+        pool_name: str, 
+        account_cd: int
+    ) -> Account:
+        now = datetime.utcnow()
+        stmt = (
+            select(Account)
+            .where(
+                Account.pool_name == pool_name,
+                Account.status == 'active',
+                Account.next_use_time <= now
             )
-            db.add(new_acc)
-            await db.commit()
-            await db.refresh(new_acc)
-            return new_acc
-    
-    @staticmethod
-    async def get_next_account(db: AsyncSession, pool_name: str, lock_duration: float) -> Account:
-        now = time.time()        
-        stmt = select(Account).where(
-            Account.pool_name == pool_name,
-            Account.status == 'active',
-            Account.lock_until < now
-        ).order_by(
-            Account.lock_until.asc()
-        ).limit(1).with_for_update()
-        
-        obj = (await db.execute(stmt)).scalar_one_or_none()
-        
+            .order_by(Account.next_use_time.asc())
+            .limit(1)
+            .with_for_update(skip_locked=True)
+        )
+        result = await db.execute(stmt)
+        obj = result.scalar_one_or_none()
         if not obj:
             raise NotFoundError('Account not found')
-        
-        new_lock_time = now + lock_duration
-        obj.lock_until = new_lock_time
+        obj.next_use_time = now + timedelta(seconds=account_cd)
         await db.commit()
         await db.refresh(obj)
         return obj
-                
-    @staticmethod
-    async def manual_lock(db: AsyncSession, payload: LockRequest):        
-        stmt = select(Account).where(
-            Account.pool_name == payload.pool_name,
-            Account.username == payload.username,
-        )
-        obj = (await db.execute(stmt)).scalar_one_or_none()
-        
-        if not obj:
-            raise NotFoundError('Account not found')
-        
-        obj.lock_until = time.time() + payload.duration
-        await db.commit()
-
-    @staticmethod
-    async def disable_account(db: AsyncSession, payload: LockRequest):
-        stmt = select(Account).where(
-            Account.pool_name == payload.pool_name,
-            Account.username == payload.username,
-        )
-        obj = (await db.execute(stmt)).scalar_one_or_none()
-        
-        if not obj:
-            raise NotFoundError('Account not found')
-        obj.status = "disabled"
-        await db.commit()

+ 80 - 0
app/services/proxy_service.py

@@ -0,0 +1,80 @@
+import time
+from datetime import datetime, timedelta
+from typing import Optional
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from typing import List, Dict
+from app.utils.search import apply_keyword_search_stmt
+from app.utils.pagination import paginate
+from app.core.biz_exception import NotFoundError
+from app.models.proxy_pool import ProxyPool
+from app.schemas.proxy_pool import ProxyCreate, ProxyUpdate, ProxyOut
+
+
+class ProxyService:
+    
+    @staticmethod
+    async def create_proxy(db: AsyncSession, payload: ProxyCreate):
+        rec = ProxyPool(**payload.dict())
+        db.add(rec)
+        await db.commit()
+        await db.refresh(rec)
+        return rec
+
+    @staticmethod
+    async def update_proxy(db: AsyncSession, proxy_id: int, payload: ProxyUpdate):
+        stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
+        rec = (await db.execute(stmt)).scalar_one_or_none()
+        if not rec:
+            raise NotFoundError("Proxy not exist")
+
+        for k, v in payload.dict(exclude_unset=True).items():
+            setattr(rec, k, v)
+
+        await db.commit()
+        await db.refresh(rec)
+        return rec
+
+    @staticmethod
+    async def remove_proxy(db: AsyncSession, proxy_id: int):
+        stmt = select(ProxyPool).where(ProxyPool.id == proxy_id)
+        db_obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not db_obj:
+            raise NotFoundError(f"Proxy not exist")
+        await db.delete(db_obj)
+        await db.commit()
+        return True
+
+    @staticmethod
+    async def list_proxy(db: AsyncSession, page: int, size: int, keyword: str = None):
+        stmt = select(ProxyPool)
+        query = apply_keyword_search_stmt(
+            stmt=stmt,
+            model=ProxyPool,
+            keyword=keyword,
+            fields=["id", "pool_name", "ip", "username", "password"],
+        ).order_by(ProxyPool.created_at.desc())
+        return await paginate(db, query, page, size)
+
+    @staticmethod
+    async def get_next_ip(db: AsyncSession, pools: list[str], proxy_cd: int):
+        now = datetime.utcnow()
+        stmt = (
+            select(ProxyPool)
+            .where(
+                ProxyPool.status == 'active',
+                ProxyPool.pool_name.in_(pools),
+                ProxyPool.next_use_time <= now
+            )
+            .order_by(ProxyPool.next_use_time.asc())
+            .limit(1)
+            .with_for_update(skip_locked=True)
+        )
+        result = await db.execute(stmt)
+        obj = result.scalar_one_or_none()
+        if not obj:
+            raise NotFoundError('Proxy not found')
+        obj.next_use_time = now + timedelta(seconds=proxy_cd)
+        await db.commit()
+        await db.refresh(obj)
+        return obj