# app/services/task_service.py from datetime import datetime from typing import List, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.queue_manager import queue_manager from app.core.biz_exception import NotFoundError,BizLogicError from app.models.vas_task import VasTask from app.models.order import VasOrder from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate class VasTaskService: @staticmethod async def create(db: AsyncSession, data: VasTaskCreate) -> VasTask: rec = VasTask( **data.dict(), status="pending", created_at=datetime.utcnow(), ) db.add(rec) await db.commit() await db.refresh(rec) queue_manager.put( queue_name=rec.routing_key, task_id=task.id, priority=task.priority ) return rec @staticmethod async def list_task( db: AsyncSession, status: Optional[str] = None, routing_key: Optional[str] = None, script_version: Optional[str] = None, keyword: Optional[str] = None, page: int = 0, size: int = 10, ): stmt = select(VasTask) if status: stmt = stmt.where(VasTask.status == status) if routing_key: stmt = stmt.where(VasTask.routing_key == routing_key) if script_version: stmt = stmt.where(VasTask.script_version == script_version) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasTask, keyword=keyword, fields=["order_id", "routing_key", "user_inputs"], ) stmt = stmt.order_by( VasTask.priority.desc(), VasTask.id.asc(), ) return await paginate(db, stmt, page, size) @staticmethod async def update( db: AsyncSession, id: int, payload: VasTaskUpdate, ) -> VasTask: stmt = select(VasTask).where(VasTask.id == id) result = await db.execute(stmt) obj = result.scalar_one_or_none() if not obj: raise NotFoundError("Task not exist") data = payload.dict(exclude_unset=True) for key, value in data.items(): setattr(obj, key, value) await db.commit() await db.refresh(obj) return obj @staticmethod async def get_active_task_by_order_id( db: AsyncSession, order_id: str, ) -> List[VasTask]: stmt = select(VasTask).where( VasTask.status == "pending", VasTask.order_id == order_id, ) result = await db.execute(stmt) return result.scalars().all() @staticmethod async def return_to_queue(db: AsyncSession, id: int) -> VasTask: stmt = select(VasTask).where(VasTask.id == id) result = await db.execute(stmt) rec = result.scalar_one_or_none() if not rec: raise NotFoundError("Task not exist") if rec.status == "pending": raise BizLogicError("Task is in queue already") rec.status = "pending" if rec.status == "grabbed": rec.attempt_count = (rec.attempt_count or 0) + 1 await db.commit() await db.refresh(rec) queue_manager.put( queue_name=rec.routing_key, task_id=rec.id, priority= max(0, rec.priority - rec.attempt_count), ) return rec @staticmethod async def manual_confirm(db: AsyncSession, id: int) -> VasTask: stmt = select(VasTask).where(VasTask.id == id) result = await db.execute(stmt) task = result.scalar_one_or_none() if not task: raise NotFoundError("Task not exist") task.status = "completed" order_stmt = select(VasOrder).where(VasOrder.id == task.order_id) order_result = await db.execute(order_stmt) order = order_result.scalar_one_or_none() if not order: raise NotFoundError("Order not exist") order.status = "completed" await db.commit() await db.refresh(task) return task