vas_task_service.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # app/services/task_service.py
  2. from sqlalchemy.orm import Session
  3. from typing import List
  4. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  5. from app.models.vas_task import VasTask
  6. from app.schemas.vas_task import VasTaskCreate
  7. from datetime import datetime
  8. class VasTaskService:
  9. def create(db: Session, data: VasTaskCreate):
  10. rec = VasTask(**data.dict(), status='pending', created_at=datetime.utcnow())
  11. db.add(rec)
  12. db.commit()
  13. db.refresh(rec)
  14. return rec
  15. def get_pending(
  16. db: Session,
  17. routing_key: str = None,
  18. script_version: str = None,
  19. limit: int = 50,
  20. ):
  21. query = db.query(VasTask).filter(
  22. VasTask.status == "pending",
  23. )
  24. if routing_key:
  25. query = query.filter(VasTask.routing_key == routing_key)
  26. if script_version:
  27. query = query.filter(VasTask.script_version == script_version)
  28. return (
  29. query
  30. .order_by(
  31. VasTask.priority.desc(),
  32. VasTask.created_at.asc()
  33. )
  34. .limit(limit)
  35. .all()
  36. )
  37. def get_active_task_by_order_id(db: Session, order_id:str):
  38. recs = db.query(VasTask).filter_by(
  39. VasTask.status == "pending",
  40. VasTask.order_id==order_id,
  41. ).all()
  42. return recs
  43. def return_to_queue(db: Session, id:int):
  44. rec = db.query(VasTask).filter_by(id=id).first()
  45. if not rec:
  46. raise NotFoundError("Task not exist")
  47. rec.status = 'pending'
  48. db.commit()
  49. db.refresh(rec)
  50. return rec
  51. def manual_confirm(db: Session, id:int):
  52. rec = db.query(VasTask).filter_by(id=id).first()
  53. if not rec:
  54. raise NotFoundError("Task not exist")
  55. rec.status = 'completed'
  56. db.commit()
  57. db.refresh(rec)
  58. return rec