vas_task_service.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # app/services/task_service.py
  2. from datetime import datetime
  3. from typing import List, Optional
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select
  6. from app.utils.search import apply_keyword_search_stmt
  7. from app.utils.pagination import paginate
  8. from app.core.queue_manager import queue_manager
  9. from app.core.biz_exception import NotFoundError,BizLogicError
  10. from app.models.vas_task import VasTask
  11. from app.models.order import VasOrder
  12. from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate
  13. class VasTaskService:
  14. @staticmethod
  15. async def create(db: AsyncSession, data: VasTaskCreate) -> VasTask:
  16. rec = VasTask(
  17. **data.dict(),
  18. status="pending",
  19. created_at=datetime.utcnow(),
  20. )
  21. db.add(rec)
  22. await db.commit()
  23. await db.refresh(rec)
  24. queue_manager.put(
  25. queue_name=rec.routing_key,
  26. task_id=task.id,
  27. priority=task.priority
  28. )
  29. return rec
  30. @staticmethod
  31. async def list_task(
  32. db: AsyncSession,
  33. status: Optional[str] = None,
  34. routing_key: Optional[str] = None,
  35. script_version: Optional[str] = None,
  36. keyword: Optional[str] = None,
  37. page: int = 0,
  38. size: int = 10,
  39. ):
  40. stmt = select(VasTask)
  41. if status:
  42. stmt = stmt.where(VasTask.status == status)
  43. if routing_key:
  44. stmt = stmt.where(VasTask.routing_key == routing_key)
  45. if script_version:
  46. stmt = stmt.where(VasTask.script_version == script_version)
  47. stmt = apply_keyword_search_stmt(
  48. stmt=stmt,
  49. model=VasTask,
  50. keyword=keyword,
  51. fields=["order_id", "routing_key", "user_inputs"],
  52. )
  53. stmt = stmt.order_by(
  54. VasTask.priority.desc(),
  55. VasTask.id.asc(),
  56. )
  57. return await paginate(db, stmt, page, size)
  58. @staticmethod
  59. async def update(
  60. db: AsyncSession,
  61. id: int,
  62. payload: VasTaskUpdate,
  63. ) -> VasTask:
  64. stmt = select(VasTask).where(VasTask.id == id)
  65. result = await db.execute(stmt)
  66. obj = result.scalar_one_or_none()
  67. if not obj:
  68. raise NotFoundError("Task not exist")
  69. data = payload.dict(exclude_unset=True)
  70. for key, value in data.items():
  71. setattr(obj, key, value)
  72. await db.commit()
  73. await db.refresh(obj)
  74. return obj
  75. @staticmethod
  76. async def get_active_task_by_order_id(
  77. db: AsyncSession,
  78. order_id: str,
  79. ) -> List[VasTask]:
  80. stmt = select(VasTask).where(
  81. VasTask.status == "pending",
  82. VasTask.order_id == order_id,
  83. )
  84. result = await db.execute(stmt)
  85. return result.scalars().all()
  86. @staticmethod
  87. async def return_to_queue(db: AsyncSession, id: int) -> VasTask:
  88. stmt = select(VasTask).where(VasTask.id == id)
  89. result = await db.execute(stmt)
  90. rec = result.scalar_one_or_none()
  91. if not rec:
  92. raise NotFoundError("Task not exist")
  93. if rec.status == "pending":
  94. raise BizLogicError("Task is in queue already")
  95. rec.status = "pending"
  96. if rec.status == "grabbed":
  97. rec.attempt_count = (rec.attempt_count or 0) + 1
  98. await db.commit()
  99. await db.refresh(rec)
  100. queue_manager.put(
  101. queue_name=rec.routing_key,
  102. task_id=rec.id,
  103. priority= max(0, rec.priority - rec.attempt_count),
  104. )
  105. return rec
  106. @staticmethod
  107. async def manual_confirm(db: AsyncSession, id: int) -> VasTask:
  108. stmt = select(VasTask).where(VasTask.id == id)
  109. result = await db.execute(stmt)
  110. task = result.scalar_one_or_none()
  111. if not task:
  112. raise NotFoundError("Task not exist")
  113. task.status = "completed"
  114. order_stmt = select(VasOrder).where(VasOrder.id == task.order_id)
  115. order_result = await db.execute(order_stmt)
  116. order = order_result.scalar_one_or_none()
  117. if not order:
  118. raise NotFoundError("Order not exist")
  119. order.status = "completed"
  120. await db.commit()
  121. await db.refresh(task)
  122. return task