from datetime import datetime from typing import List, Optional from redis.asyncio import Redis from sqlalchemy import select, case from sqlalchemy.ext.asyncio import AsyncSession from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.user import VasUser from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.payment import VasPayment from app.models.ticket import VasTicket from app.models.ticket_message import VasTicketMessage from app.schemas.ticket import VasTicketCreate from app.services.notification_service import NotificationService from app.services.webhook_service import WebhookService class TicketService: @staticmethod async def create( db: AsyncSession, data: VasTicketCreate, current_user: VasUser, redis_client: Redis ): ticket = VasTicket( **data.dict(), user_id=current_user.id, status="pending", created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) db.add(ticket) await db.commit() await db.refresh(ticket) formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)" await NotificationService.post_message( db=db, channel="email", payload={ "template_id": "ticket_created", "receiver": current_user.email, "payload": { "app_name": "TextSkin", "username": current_user.email, "ticket_id": ticket.id, "ticket_type": ticket.type, "ticket_url": "https://text.skin/dashboard", "created_at": formatted_time } }, ) return ticket @staticmethod async def update_status( db: AsyncSession, ticket_id: int, status: str, comment: str, admin_id: str, ) -> VasTicket: result = await db.execute( select(VasTicket) .where(VasTicket.id == ticket_id) .with_for_update() ) ticket = result.scalar_one_or_none() if not ticket: raise NotFoundError("Ticket not exist") ticket.status = status ticket.admin_comment = comment ticket.updated_at = datetime.utcnow() db.add( VasTicketMessage( ticket_id=ticket.id, sender_type="admin", sender_id=str(admin_id), content=comment, created_at=datetime.utcnow(), ) ) if status == "resolved": await TicketService._handle_resolution(db, ticket) elif status == "rejected": await TicketService._handle_rejection(db, ticket) await db.commit() await db.refresh(ticket) return ticket # ========================= # 工单解决逻辑 # ========================= @staticmethod async def _handle_resolution( db: AsyncSession, ticket: VasTicket, ) -> None: if not ticket.order_id: return result = await db.execute( select(VasOrder) .where(VasOrder.id == ticket.order_id) .with_for_update() ) order = result.scalar_one_or_none() if not order: return # ---------- 退款 ---------- if ticket.type == "refund": order.status = "closed" pay_res = await db.execute( select(VasPayment).where( VasPayment.order_id == order.id, VasPayment.status.in_(["succeeded", "late_paid"]), ) ) payment = pay_res.scalar_one_or_none() if payment: payment.status = "refunded" payment.refunded_at = datetime.utcnow() payment.refund_reason = ticket.reason task_res = await db.execute( select(VasTask).where( VasTask.order_id == order.id, VasTask.status.in_(["pending", "grabbed", "running", "pause", "completed"]), ) ) for task in task_res.scalars().all(): task.status = "cancelled" # ========================= # 工单拒绝逻辑 # ========================= @staticmethod async def _handle_rejection( db: AsyncSession, ticket: VasTicket, ) -> None: if not ticket.order_id: return result = await db.execute( select(VasOrder) .where(VasOrder.id == ticket.order_id) .with_for_update() ) order = result.scalar_one_or_none() if not order: return # if ticket.type == "refund" and order.status == "refund_pending": # order.status = "paid" @staticmethod async def add_message( db: AsyncSession, ticket_id: int, sender_type: str, # "user" | "admin" | "system" sender_id: Optional[str] = None, content: str = "", attachments: Optional[dict] = None, ): ticket = await db.get(VasTicket, ticket_id) if not ticket: raise NotFoundError("Ticket not exist") message = VasTicketMessage( ticket_id=ticket_id, sender_type=sender_type, sender_id=sender_id, content=content, attachments=attachments, created_at=datetime.utcnow(), ) db.add(message) ticket.updated_at = datetime.utcnow() await db.commit() await db.refresh(message) return message @staticmethod async def list_messages( db: AsyncSession, ticket_id: int, page: int = 1, size: int = 20, ): # 校验 ticket 是否存在 exists = await db.scalar( select(VasTicket.id).where(VasTicket.id == ticket_id) ) if not exists: raise NotFoundError("Ticket not exist") stmt = ( select(VasTicketMessage) .where(VasTicketMessage.ticket_id == ticket_id) .order_by(VasTicketMessage.created_at.desc()) ) return await paginate(db, stmt, page, size) @staticmethod async def list_by_user( db: AsyncSession, user_id: str, page: int = 1, size: int = 20, keyword: Optional[str] = None, ): stmt = select(VasTicket).where(VasTicket.user_id == user_id) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasTicket, keyword=keyword, fields=["order_id", "user_id", "reason", "admin_comment"], ) stmt = stmt.order_by(VasTicket.id.desc()) return await paginate(db, stmt, page, size) @staticmethod async def list_all( db: AsyncSession, page: int = 1, size: int = 20, keyword: Optional[str] = None, ): stmt = select(VasTicket) # 关键词搜索 stmt = apply_keyword_search_stmt( stmt=stmt, model=VasTicket, keyword=keyword, fields=["order_id", "user_id", "reason", "admin_comment"], ) # 排序:未解决优先 ('pending','info_required') -> 0, 已解决 ('resolved','rejected') -> 1 stmt = stmt.order_by( case( (VasTicket.status.in_(["pending", "info_required"]), 0), else_=1 ), VasTicket.id.desc() ) return await paginate(db, stmt, page, size)