vas_task_service.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # app/services/task_service.py
  2. from sqlalchemy.orm import Session
  3. from typing import List
  4. from app.utils.search import apply_keyword_search
  5. from app.utils.pagination import paginate
  6. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  7. from app.models.vas_task import VasTask
  8. from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate
  9. from datetime import datetime
  10. class VasTaskService:
  11. def create(db: Session, data: VasTaskCreate):
  12. rec = VasTask(**data.dict(), status='pending', created_at=datetime.utcnow())
  13. db.add(rec)
  14. db.commit()
  15. db.refresh(rec)
  16. return rec
  17. def list_task(
  18. db: Session,
  19. status: str = None,
  20. routing_key: str = None,
  21. script_version: str = None,
  22. keyword: str = None,
  23. page: int = 0,
  24. size: int = 10,
  25. ):
  26. query = db.query(VasTask)
  27. if status:
  28. query = query.filter(VasTask.status == status)
  29. if routing_key:
  30. query = query.filter(VasTask.routing_key == routing_key)
  31. if script_version:
  32. query = query.filter(VasTask.script_version == script_version)
  33. query = apply_keyword_search(
  34. query=query,
  35. model=VasTask,
  36. keyword=keyword,
  37. fields=["order_id", "routing_key", "user_inputs"]
  38. )
  39. query = query.order_by(
  40. VasTask.priority.desc(),
  41. VasTask.id.asc()
  42. )
  43. return paginate(query, page, size)
  44. def update(db: Session, id: int, payload: VasTaskUpdate):
  45. obj = db.query(VasTask).filter(VasTask.id == id).first()
  46. if not obj:
  47. raise NotFoundError("Task not exist")
  48. data = payload.dict(exclude_unset=True) # ⭐ 关键
  49. for key, value in data.items():
  50. setattr(obj, key, value)
  51. db.commit()
  52. db.refresh(obj)
  53. return obj
  54. def get_active_task_by_order_id(db: Session, order_id:str):
  55. recs = db.query(VasTask).filter(
  56. VasTask.status == "pending",
  57. VasTask.order_id == order_id
  58. ).all()
  59. return recs
  60. def return_to_queue(db: Session, id:int):
  61. rec = db.query(VasTask).filter_by(id=id).first()
  62. if not rec:
  63. raise NotFoundError("Task not exist")
  64. rec.status = 'pending'
  65. db.commit()
  66. db.refresh(rec)
  67. return rec
  68. def manual_confirm(db: Session, id:int):
  69. rec = db.query(VasTask).filter_by(id=id).first()
  70. if not rec:
  71. raise NotFoundError("Task not exist")
  72. rec.status = 'completed'
  73. db.commit()
  74. db.refresh(rec)
  75. return rec