# app/services/ticket_service.py from datetime import datetime from redis.asyncio import Redis from typing import List from sqlalchemy.orm import Session from app.utils.search import apply_keyword_search from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.user import VasUser from app.models.ticket import VasTicket from app.models.ticket_message import VasTicketMessage from app.schemas.ticket import VasTicketCreate from app.services.notification_service import NotificationService class TicketService: @staticmethod def create(db: Session, data: VasTicketCreate, current_user: VasUser, redis_client: Redis): rec = VasTicket(**data.dict(), status='pending', created_at=datetime.utcnow()) rec.user_id = current_user.id db.add(rec) db.commit() db.refresh(rec) print(f"📧 send ticket created notification email") NotificationService.create( redis_client=redis_client, ntype="ticket created", user_id=current_user.id, channels=["email"], template_id="ticket_created", payload={ "ticket_id": rec.id, "order_id": rec.order_id } ) return rec @staticmethod def update_status(db: Session, ticket_id, status, comment, admin_id): ticket = db.query(VasTicket).filter_by(id=ticket_id).first() if not ticket: raise NotFoundError("Ticket not exist") ticket.status = status ticket.admin_comment = comment db.add( VasTicketMessage( ticket_id=ticket_id, sender_type="admin", sender_id=admin_id, content=comment ) ) db.commit() db.refresh(ticket) return ticket @staticmethod def add_message( db: Session, ticket_id: int, sender_type: str, # "user" | "admin" | "system" sender_id: str = None, content: str = "", attachments: dict = None ): # 1️⃣ 校验工单是否存在 ticket = db.query(VasTicket).filter( VasTicket.id == ticket_id ).first() if not ticket: raise NotFoundError("Ticket not exist") # 2️⃣ 创建消息 message = VasTicketMessage( ticket_id=ticket_id, sender_type=sender_type, sender_id=sender_id, content=content, attachments=attachments, created_at=datetime.utcnow() ) # 3️⃣ 写入数据库 db.add(message) # 4️⃣ 更新工单更新时间(非常重要) ticket.updated_at = datetime.utcnow() db.commit() db.refresh(message) return message @staticmethod def list_messages( db: Session, ticket_id: int, page: int = 1, size: int = 20 ): # 1️⃣ 校验 ticket 是否存在 exists = db.query(VasTicket.id).filter( VasTicket.id == ticket_id ).first() if not exists: raise NotFoundError("Ticket not exist") # 2️⃣ 查询消息(按时间正序) query = ( db.query(VasTicketMessage) .filter(VasTicketMessage.ticket_id == ticket_id) .order_by(VasTicketMessage.created_at.desc()) ) return paginate(query, page, size) @staticmethod def list_by_user(db: Session, user_id: str, page: int=0, size: int=10, keyword: str=None): query = db.query(VasTicket).filter_by(user_id=user_id) query = apply_keyword_search( query=query, model=VasTicket, keyword=keyword, fields=["order_id", "user_id", "reason", "admin_comment"] ) query = query.order_by( VasTicket.id.desc() ) return paginate(query, page, size) @staticmethod def list_all(db: Session, page: int=0, size: int=10, keyword: str=None): query = db.query(VasTicket) query = apply_keyword_search( query=query, model=VasTicket, keyword=keyword, fields=["order_id", "user_id", "reason", "admin_comment"] ) query = query.order_by( VasTicket.id.desc() ) return paginate(query, page, size)