| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- # app/services/task_service.py
- from sqlalchemy.orm import Session
- from typing import List
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.models.vas_task import VasTask
- from app.schemas.vas_task import VasTaskCreate
- from datetime import datetime
- class VasTaskService:
- def create(db: Session, data: VasTaskCreate):
- rec = VasTask(**data.dict(), status='pending', created_at=datetime.utcnow())
- db.add(rec)
- db.commit()
- db.refresh(rec)
- return rec
- def get_pending(
- db: Session,
- routing_key: str = None,
- script_version: str = None,
- limit: int = 50,
- ):
- query = db.query(VasTask).filter(
- VasTask.status == "pending",
- )
-
- if routing_key:
- query = query.filter(VasTask.routing_key == routing_key)
- if script_version:
- query = query.filter(VasTask.script_version == script_version)
- return (
- query
- .order_by(
- VasTask.priority.desc(),
- VasTask.created_at.asc()
- )
- .limit(limit)
- .all()
- )
-
- def get_active_task_by_order_id(db: Session, order_id:str):
- recs = db.query(VasTask).filter_by(
- VasTask.status == "pending",
- VasTask.order_id==order_id,
- ).all()
- return recs
-
- def return_to_queue(db: Session, id:int):
- rec = db.query(VasTask).filter_by(id=id).first()
- if not rec:
- raise NotFoundError("Task not exist")
- rec.status = 'pending'
- db.commit()
- db.refresh(rec)
- return rec
- def manual_confirm(db: Session, id:int):
- rec = db.query(VasTask).filter_by(id=id).first()
- if not rec:
- raise NotFoundError("Task not exist")
- rec.status = 'completed'
- db.commit()
- db.refresh(rec)
- return rec
|