# app/services/task_service.py from sqlalchemy.orm import Session from typing import List from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.vas_task import VasTask from app.schemas.vas_task import VasTaskCreate from datetime import datetime class VasTaskService: def create(db: Session, data: VasTaskCreate): rec = VasTask(**data.dict(), status='pending', created_at=datetime.utcnow()) db.add(rec) db.commit() db.refresh(rec) return rec def get_pending( db: Session, routing_key: str = None, script_version: str = None, limit: int = 50, ): query = db.query(VasTask).filter( VasTask.status == "pending", ) if routing_key: query = query.filter(VasTask.routing_key == routing_key) if script_version: query = query.filter(VasTask.script_version == script_version) return ( query .order_by( VasTask.priority.desc(), VasTask.created_at.asc() ) .limit(limit) .all() ) def get_active_task_by_order_id(db: Session, order_id:str): recs = db.query(VasTask).filter_by( VasTask.status == "pending", VasTask.order_id==order_id, ).all() return recs def return_to_queue(db: Session, id:int): rec = db.query(VasTask).filter_by(id=id).first() if not rec: raise NotFoundError("Task not exist") rec.status = 'pending' db.commit() db.refresh(rec) return rec def manual_confirm(db: Session, id:int): rec = db.query(VasTask).filter_by(id=id).first() if not rec: raise NotFoundError("Task not exist") rec.status = 'completed' db.commit() db.refresh(rec) return rec