ticket_service.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from datetime import datetime
  2. from typing import List, Optional
  3. from redis.asyncio import Redis
  4. from sqlalchemy import select
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from app.utils.search import apply_keyword_search_stmt
  7. from app.utils.pagination import paginate
  8. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  9. from app.models.user import VasUser
  10. from app.models.order import VasOrder
  11. from app.models.vas_task import VasTask
  12. from app.models.payment import VasPayment
  13. from app.models.ticket import VasTicket
  14. from app.models.ticket_message import VasTicketMessage
  15. from app.schemas.ticket import VasTicketCreate
  16. from app.services.notification_service import NotificationService
  17. from app.services.webhook_service import WebhookService
  18. class TicketService:
  19. @staticmethod
  20. async def create(
  21. db: AsyncSession,
  22. data: VasTicketCreate,
  23. current_user: VasUser,
  24. redis_client: Redis
  25. ):
  26. ticket = VasTicket(
  27. **data.dict(),
  28. user_id=current_user.id,
  29. status="pending",
  30. created_at=datetime.utcnow(),
  31. updated_at=datetime.utcnow(),
  32. )
  33. db.add(ticket)
  34. await db.commit()
  35. await db.refresh(ticket)
  36. formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
  37. await NotificationService.post_email(
  38. redis_client=redis_client,
  39. receiver=current_user.email,
  40. template_id="ticket_created",
  41. payload={
  42. "app_name": "Visafly",
  43. "username": current_user.email,
  44. "ticket_id": ticket.id,
  45. "ticket_type": ticket.type,
  46. "ticket_url": "http://45.137.220.138:3000/dashboard",
  47. "created_at": formatted_time
  48. },
  49. )
  50. return ticket
  51. @staticmethod
  52. async def update_status(
  53. db: AsyncSession,
  54. ticket_id: int,
  55. status: str,
  56. comment: str,
  57. admin_id: str,
  58. ) -> VasTicket:
  59. # 🔒 锁住 ticket,防并发管理员操作
  60. result = await db.execute(
  61. select(VasTicket)
  62. .where(VasTicket.id == ticket_id)
  63. .with_for_update()
  64. )
  65. ticket = result.scalar_one_or_none()
  66. if not ticket:
  67. raise NotFoundError("Ticket not exist")
  68. ticket.status = status
  69. ticket.admin_comment = comment
  70. ticket.updated_at = datetime.utcnow()
  71. db.add(
  72. VasTicketMessage(
  73. ticket_id=ticket.id,
  74. sender_type="admin",
  75. sender_id=str(admin_id),
  76. content=comment,
  77. created_at=datetime.utcnow(),
  78. )
  79. )
  80. if status == "resolved":
  81. await TicketService._handle_resolution(db, ticket)
  82. elif status == "rejected":
  83. await TicketService._handle_rejection(db, ticket)
  84. await db.commit()
  85. await db.refresh(ticket)
  86. return ticket
  87. # =========================
  88. # 工单解决逻辑
  89. # =========================
  90. @staticmethod
  91. async def _handle_resolution(
  92. db: AsyncSession,
  93. ticket: VasTicket,
  94. ) -> None:
  95. if not ticket.order_id:
  96. return
  97. result = await db.execute(
  98. select(VasOrder)
  99. .where(VasOrder.id == ticket.order_id)
  100. .with_for_update()
  101. )
  102. order = result.scalar_one_or_none()
  103. if not order:
  104. return
  105. # ---------- 退款 ----------
  106. if ticket.type == "refund":
  107. order.status = "closed"
  108. pay_res = await db.execute(
  109. select(VasPayment).where(
  110. VasPayment.order_id == order.id,
  111. VasPayment.status.in_(["succeeded", "late_paid"]),
  112. )
  113. )
  114. payment = pay_res.scalar_one_or_none()
  115. if payment:
  116. payment.status = "refunded"
  117. payment.refunded_at = datetime.utcnow()
  118. payment.refund_reason = ticket.reason
  119. task_res = await db.execute(
  120. select(VasTask).where(
  121. VasTask.order_id == order.id,
  122. VasTask.status.in_(["pending", "grabbed", "running"]),
  123. )
  124. )
  125. for task in task_res.scalars().all():
  126. task.status = "cancelled"
  127. # ---------- 变更请求 ----------
  128. elif ticket.type == "change_request":
  129. # 1️⃣ 取消旧任务
  130. task_res = await db.execute(
  131. select(VasTask).where(
  132. VasTask.order_id == order.id,
  133. VasTask.status.in_(["pending", "grabbed", "running"]),
  134. )
  135. )
  136. for task in task_res.scalars().all():
  137. task.status = "cancelled"
  138. await WebhookService._create_task_if_not_exists(db, order)
  139. # =========================
  140. # 工单拒绝逻辑
  141. # =========================
  142. @staticmethod
  143. async def _handle_rejection(
  144. db: AsyncSession,
  145. ticket: VasTicket,
  146. ) -> None:
  147. if not ticket.order_id:
  148. return
  149. result = await db.execute(
  150. select(VasOrder)
  151. .where(VasOrder.id == ticket.order_id)
  152. .with_for_update()
  153. )
  154. order = result.scalar_one_or_none()
  155. if not order:
  156. return
  157. if ticket.type == "refund" and order.status == "refund_pending":
  158. order.status = "paid"
  159. @staticmethod
  160. async def add_message(
  161. db: AsyncSession,
  162. ticket_id: int,
  163. sender_type: str, # "user" | "admin" | "system"
  164. sender_id: Optional[str] = None,
  165. content: str = "",
  166. attachments: Optional[dict] = None,
  167. ):
  168. ticket = await db.get(VasTicket, ticket_id)
  169. if not ticket:
  170. raise NotFoundError("Ticket not exist")
  171. message = VasTicketMessage(
  172. ticket_id=ticket_id,
  173. sender_type=sender_type,
  174. sender_id=sender_id,
  175. content=content,
  176. attachments=attachments,
  177. created_at=datetime.utcnow(),
  178. )
  179. db.add(message)
  180. ticket.updated_at = datetime.utcnow()
  181. await db.commit()
  182. await db.refresh(message)
  183. return message
  184. @staticmethod
  185. async def list_messages(
  186. db: AsyncSession,
  187. ticket_id: int,
  188. page: int = 1,
  189. size: int = 20,
  190. ):
  191. # 校验 ticket 是否存在
  192. exists = await db.scalar(
  193. select(VasTicket.id).where(VasTicket.id == ticket_id)
  194. )
  195. if not exists:
  196. raise NotFoundError("Ticket not exist")
  197. stmt = (
  198. select(VasTicketMessage)
  199. .where(VasTicketMessage.ticket_id == ticket_id)
  200. .order_by(VasTicketMessage.created_at.desc())
  201. )
  202. return await paginate(db, stmt, page, size)
  203. @staticmethod
  204. async def list_by_user(
  205. db: AsyncSession,
  206. user_id: str,
  207. page: int = 1,
  208. size: int = 20,
  209. keyword: Optional[str] = None,
  210. ):
  211. stmt = select(VasTicket).where(VasTicket.user_id == user_id)
  212. stmt = apply_keyword_search_stmt(
  213. stmt=stmt,
  214. model=VasTicket,
  215. keyword=keyword,
  216. fields=["order_id", "user_id", "reason", "admin_comment"],
  217. )
  218. stmt = stmt.order_by(VasTicket.id.desc())
  219. return await paginate(db, stmt, page, size)
  220. @staticmethod
  221. async def list_all(
  222. db: AsyncSession,
  223. page: int = 1,
  224. size: int = 20,
  225. keyword: Optional[str] = None,
  226. ):
  227. stmt = select(VasTicket)
  228. stmt = apply_keyword_search_stmt(
  229. stmt=stmt,
  230. model=VasTicket,
  231. keyword=keyword,
  232. fields=["order_id", "user_id", "reason", "admin_comment"],
  233. )
  234. stmt = stmt.order_by(VasTicket.id.desc())
  235. return await paginate(db, stmt, page, size)