vas_task_service.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # app/services/task_service.py
  2. from datetime import datetime
  3. from typing import List, Optional
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select
  6. from app.utils.search import apply_keyword_search_stmt
  7. from app.utils.pagination import paginate
  8. from app.core.queue_manager import queue_manager
  9. from app.core.biz_exception import NotFoundError,BizLogicError
  10. from app.models.vas_task import VasTask
  11. from app.models.order import VasOrder
  12. from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate
  13. from app.services.task_handlers import task_processor
  14. class VasTaskService:
  15. @staticmethod
  16. async def create(db: AsyncSession, data: VasTaskCreate) -> VasTask:
  17. rec = VasTask(
  18. **data.dict(),
  19. status="pending",
  20. created_at=datetime.utcnow(),
  21. )
  22. db.add(rec)
  23. await db.commit()
  24. await db.refresh(rec)
  25. queue_manager.put(
  26. queue_name=rec.routing_key,
  27. task_id=task.id,
  28. priority=task.priority
  29. )
  30. return rec
  31. @staticmethod
  32. async def list_task(
  33. db: AsyncSession,
  34. status: Optional[str] = None,
  35. routing_key: Optional[str] = None,
  36. script_version: Optional[str] = None,
  37. keyword: Optional[str] = None,
  38. page: int = 0,
  39. size: int = 10,
  40. ):
  41. stmt = select(VasTask)
  42. if status:
  43. stmt = stmt.where(VasTask.status == status)
  44. if routing_key:
  45. stmt = stmt.where(VasTask.routing_key == routing_key)
  46. if script_version:
  47. stmt = stmt.where(VasTask.script_version == script_version)
  48. stmt = apply_keyword_search_stmt(
  49. stmt=stmt,
  50. model=VasTask,
  51. keyword=keyword,
  52. fields=["order_id", "routing_key", "user_inputs"],
  53. )
  54. stmt = stmt.order_by(
  55. VasTask.priority.desc(),
  56. VasTask.id.asc(),
  57. )
  58. return await paginate(db, stmt, page, size)
  59. @staticmethod
  60. async def update(
  61. db: AsyncSession,
  62. id: int,
  63. payload: VasTaskUpdate,
  64. ) -> VasTask:
  65. stmt = select(VasTask).where(VasTask.id == id)
  66. result = await db.execute(stmt)
  67. obj = result.scalar_one_or_none()
  68. if not obj:
  69. raise NotFoundError("Task not exist")
  70. data = payload.dict(exclude_unset=True)
  71. for key, value in data.items():
  72. setattr(obj, key, value)
  73. await db.commit()
  74. await db.refresh(obj)
  75. return obj
  76. @staticmethod
  77. async def get_active_task_by_order_id(
  78. db: AsyncSession,
  79. order_id: str,
  80. ) -> List[VasTask]:
  81. stmt = select(VasTask).where(
  82. VasTask.status == "pending",
  83. VasTask.order_id == order_id,
  84. )
  85. result = await db.execute(stmt)
  86. return result.scalars().all()
  87. @staticmethod
  88. async def return_to_queue(db: AsyncSession, id: int) -> VasTask:
  89. stmt = select(VasTask).where(VasTask.id == id)
  90. result = await db.execute(stmt)
  91. rec = result.scalar_one_or_none()
  92. if not rec:
  93. raise NotFoundError("Task not exist")
  94. if rec.status == "pending":
  95. raise BizLogicError("Task is in queue already")
  96. rec.status = "pending"
  97. if rec.status == "grabbed":
  98. rec.attempt_count = (rec.attempt_count or 0) + 1
  99. await db.commit()
  100. await db.refresh(rec)
  101. queue_manager.put(
  102. queue_name=rec.routing_key,
  103. task_id=rec.id,
  104. priority= max(0, rec.priority - rec.attempt_count),
  105. )
  106. return rec
  107. @staticmethod
  108. async def manual_confirm(db: AsyncSession, id: int) -> VasTask:
  109. stmt = select(VasTask).where(VasTask.id == id)
  110. result = await db.execute(stmt)
  111. task = result.scalar_one_or_none()
  112. if not task:
  113. raise NotFoundError("Task not exist")
  114. task.status = "completed"
  115. order_stmt = select(VasOrder).where(VasOrder.id == task.order_id)
  116. order_result = await db.execute(order_stmt)
  117. order = order_result.scalar_one_or_none()
  118. if not order:
  119. raise NotFoundError("Order not exist")
  120. order.status = "completed"
  121. await task_processor.execute(task.routing_key, db, task, order)
  122. await db.commit()
  123. await db.refresh(task)
  124. return task