import json from sqlalchemy.orm import Session from typing import List, Optional from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.task import Task from app.schemas.task import TaskCreate, TaskUpdate class TaskService: @staticmethod def create(db: Session, obj_in: TaskCreate) -> Task: db_obj = Task( command=obj_in.command, args=obj_in.args, status=obj_in.status or 0, ) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj @staticmethod def get_by_id(db: Session, task_id: int) -> Optional[Task]: obj = db.query(Task).filter(Task.id == task_id).first() if not obj: raise NotFoundError("Task not exist") return obj @staticmethod def update(db: Session, task_id: int, obj_in: TaskUpdate) -> Optional[Task]: db_obj = db.query(Task).filter(Task.id == task_id).first() if not db_obj: raise NotFoundError("Task not exist") if obj_in.result is not None: db_obj.result = obj_in.result if obj_in.status is not None: db_obj.status = obj_in.status db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj @staticmethod def get_pending(db: Session, command: str, page: int, size: int) -> List[Task]: offset = page * size return ( db.query(Task) .filter(Task.command == command, Task.status == 0) .order_by(Task.create_at.asc()) .offset(offset) .limit(size) .all() )