| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- from datetime import datetime
- from typing import List, Optional
- from redis.asyncio import Redis
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.utils.search import apply_keyword_search_stmt
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.models.user import VasUser
- from app.models.order import VasOrder
- from app.models.vas_task import VasTask
- from app.models.payment import VasPayment
- from app.models.ticket import VasTicket
- from app.models.ticket_message import VasTicketMessage
- from app.schemas.ticket import VasTicketCreate
- from app.services.notification_service import NotificationService
- from app.services.webhook_service import WebhookService
- class TicketService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: VasTicketCreate,
- current_user: VasUser,
- redis_client: Redis
- ):
- ticket = VasTicket(
- **data.dict(),
- user_id=current_user.id,
- status="pending",
- created_at=datetime.utcnow(),
- updated_at=datetime.utcnow(),
- )
- db.add(ticket)
- await db.commit()
- await db.refresh(ticket)
-
- formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
- await NotificationService.post_email(
- redis_client=redis_client,
- receiver=current_user.email,
- template_id="ticket_created",
- payload={
- "app_name": "Visafly",
- "username": current_user.email,
- "ticket_id": ticket.id,
- "ticket_type": ticket.type,
- "ticket_url": "http://45.137.220.138:3000/dashboard",
- "created_at": formatted_time
- },
- )
- return ticket
- @staticmethod
- async def update_status(
- db: AsyncSession,
- ticket_id: int,
- status: str,
- comment: str,
- admin_id: str,
- ) -> VasTicket:
- result = await db.execute(
- select(VasTicket)
- .where(VasTicket.id == ticket_id)
- .with_for_update()
- )
- ticket = result.scalar_one_or_none()
- if not ticket:
- raise NotFoundError("Ticket not exist")
- ticket.status = status
- ticket.admin_comment = comment
- ticket.updated_at = datetime.utcnow()
- db.add(
- VasTicketMessage(
- ticket_id=ticket.id,
- sender_type="admin",
- sender_id=str(admin_id),
- content=comment,
- created_at=datetime.utcnow(),
- )
- )
- if status == "resolved":
- await TicketService._handle_resolution(db, ticket)
- elif status == "rejected":
- await TicketService._handle_rejection(db, ticket)
- await db.commit()
- await db.refresh(ticket)
- return ticket
- # =========================
- # 工单解决逻辑
- # =========================
- @staticmethod
- async def _handle_resolution(
- db: AsyncSession,
- ticket: VasTicket,
- ) -> None:
- if not ticket.order_id:
- return
- result = await db.execute(
- select(VasOrder)
- .where(VasOrder.id == ticket.order_id)
- .with_for_update()
- )
- order = result.scalar_one_or_none()
- if not order:
- return
- # ---------- 退款 ----------
- if ticket.type == "refund":
- order.status = "closed"
- pay_res = await db.execute(
- select(VasPayment).where(
- VasPayment.order_id == order.id,
- VasPayment.status.in_(["succeeded", "late_paid"]),
- )
- )
- payment = pay_res.scalar_one_or_none()
- if payment:
- payment.status = "refunded"
- payment.refunded_at = datetime.utcnow()
- payment.refund_reason = ticket.reason
- task_res = await db.execute(
- select(VasTask).where(
- VasTask.order_id == order.id,
- VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
- )
- )
- for task in task_res.scalars().all():
- task.status = "cancelled"
- # =========================
- # 工单拒绝逻辑
- # =========================
- @staticmethod
- async def _handle_rejection(
- db: AsyncSession,
- ticket: VasTicket,
- ) -> None:
- if not ticket.order_id:
- return
- result = await db.execute(
- select(VasOrder)
- .where(VasOrder.id == ticket.order_id)
- .with_for_update()
- )
- order = result.scalar_one_or_none()
- if not order:
- return
- # if ticket.type == "refund" and order.status == "refund_pending":
- # order.status = "paid"
- @staticmethod
- async def add_message(
- db: AsyncSession,
- ticket_id: int,
- sender_type: str, # "user" | "admin" | "system"
- sender_id: Optional[str] = None,
- content: str = "",
- attachments: Optional[dict] = None,
- ):
- ticket = await db.get(VasTicket, ticket_id)
- if not ticket:
- raise NotFoundError("Ticket not exist")
- message = VasTicketMessage(
- ticket_id=ticket_id,
- sender_type=sender_type,
- sender_id=sender_id,
- content=content,
- attachments=attachments,
- created_at=datetime.utcnow(),
- )
- db.add(message)
- ticket.updated_at = datetime.utcnow()
- await db.commit()
- await db.refresh(message)
- return message
- @staticmethod
- async def list_messages(
- db: AsyncSession,
- ticket_id: int,
- page: int = 1,
- size: int = 20,
- ):
- # 校验 ticket 是否存在
- exists = await db.scalar(
- select(VasTicket.id).where(VasTicket.id == ticket_id)
- )
- if not exists:
- raise NotFoundError("Ticket not exist")
- stmt = (
- select(VasTicketMessage)
- .where(VasTicketMessage.ticket_id == ticket_id)
- .order_by(VasTicketMessage.created_at.desc())
- )
- return await paginate(db, stmt, page, size)
- @staticmethod
- async def list_by_user(
- db: AsyncSession,
- user_id: str,
- page: int = 1,
- size: int = 20,
- keyword: Optional[str] = None,
- ):
- stmt = select(VasTicket).where(VasTicket.user_id == user_id)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasTicket,
- keyword=keyword,
- fields=["order_id", "user_id", "reason", "admin_comment"],
- )
- stmt = stmt.order_by(VasTicket.id.desc())
- return await paginate(db, stmt, page, size)
- @staticmethod
- async def list_all(
- db: AsyncSession,
- page: int = 1,
- size: int = 20,
- keyword: Optional[str] = None,
- ):
- stmt = select(VasTicket)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasTicket,
- keyword=keyword,
- fields=["order_id", "user_id", "reason", "admin_comment"],
- )
- stmt = stmt.order_by(VasTicket.id.desc())
- return await paginate(db, stmt, page, size)
|