ticket_service.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. from datetime import datetime
  2. from typing import List, Optional
  3. from redis.asyncio import Redis
  4. from sqlalchemy import select
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from app.utils.search import apply_keyword_search_stmt
  7. from app.utils.pagination import paginate
  8. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  9. from app.models.user import VasUser
  10. from app.models.order import VasOrder
  11. from app.models.vas_task import VasTask
  12. from app.models.payment import VasPayment
  13. from app.models.ticket import VasTicket
  14. from app.models.ticket_message import VasTicketMessage
  15. from app.schemas.ticket import VasTicketCreate
  16. from app.services.notification_service import NotificationService
  17. from app.services.webhook_service import WebhookService
  18. class TicketService:
  19. @staticmethod
  20. async def create(
  21. db: AsyncSession,
  22. data: VasTicketCreate,
  23. current_user: VasUser,
  24. redis_client: Redis
  25. ):
  26. ticket = VasTicket(
  27. **data.dict(),
  28. user_id=current_user.id,
  29. status="pending",
  30. created_at=datetime.utcnow(),
  31. updated_at=datetime.utcnow(),
  32. )
  33. db.add(ticket)
  34. await db.commit()
  35. await db.refresh(ticket)
  36. formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
  37. await NotificationService.post_email(
  38. redis_client=redis_client,
  39. receiver=current_user.email,
  40. template_id="ticket_created",
  41. payload={
  42. "app_name": "Visafly",
  43. "username": current_user.email,
  44. "ticket_id": ticket.id,
  45. "ticket_type": ticket.type,
  46. "ticket_url": "http://45.137.220.138:3000/dashboard",
  47. "created_at": formatted_time
  48. },
  49. )
  50. return ticket
  51. @staticmethod
  52. async def update_status(
  53. db: AsyncSession,
  54. ticket_id: int,
  55. status: str,
  56. comment: str,
  57. admin_id: str,
  58. ) -> VasTicket:
  59. result = await db.execute(
  60. select(VasTicket)
  61. .where(VasTicket.id == ticket_id)
  62. .with_for_update()
  63. )
  64. ticket = result.scalar_one_or_none()
  65. if not ticket:
  66. raise NotFoundError("Ticket not exist")
  67. ticket.status = status
  68. ticket.admin_comment = comment
  69. ticket.updated_at = datetime.utcnow()
  70. db.add(
  71. VasTicketMessage(
  72. ticket_id=ticket.id,
  73. sender_type="admin",
  74. sender_id=str(admin_id),
  75. content=comment,
  76. created_at=datetime.utcnow(),
  77. )
  78. )
  79. if status == "resolved":
  80. await TicketService._handle_resolution(db, ticket)
  81. elif status == "rejected":
  82. await TicketService._handle_rejection(db, ticket)
  83. await db.commit()
  84. await db.refresh(ticket)
  85. return ticket
  86. # =========================
  87. # 工单解决逻辑
  88. # =========================
  89. @staticmethod
  90. async def _handle_resolution(
  91. db: AsyncSession,
  92. ticket: VasTicket,
  93. ) -> None:
  94. if not ticket.order_id:
  95. return
  96. result = await db.execute(
  97. select(VasOrder)
  98. .where(VasOrder.id == ticket.order_id)
  99. .with_for_update()
  100. )
  101. order = result.scalar_one_or_none()
  102. if not order:
  103. return
  104. # ---------- 退款 ----------
  105. if ticket.type == "refund":
  106. order.status = "closed"
  107. pay_res = await db.execute(
  108. select(VasPayment).where(
  109. VasPayment.order_id == order.id,
  110. VasPayment.status.in_(["succeeded", "late_paid"]),
  111. )
  112. )
  113. payment = pay_res.scalar_one_or_none()
  114. if payment:
  115. payment.status = "refunded"
  116. payment.refunded_at = datetime.utcnow()
  117. payment.refund_reason = ticket.reason
  118. task_res = await db.execute(
  119. select(VasTask).where(
  120. VasTask.order_id == order.id,
  121. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  122. )
  123. )
  124. for task in task_res.scalars().all():
  125. task.status = "cancelled"
  126. # =========================
  127. # 工单拒绝逻辑
  128. # =========================
  129. @staticmethod
  130. async def _handle_rejection(
  131. db: AsyncSession,
  132. ticket: VasTicket,
  133. ) -> None:
  134. if not ticket.order_id:
  135. return
  136. result = await db.execute(
  137. select(VasOrder)
  138. .where(VasOrder.id == ticket.order_id)
  139. .with_for_update()
  140. )
  141. order = result.scalar_one_or_none()
  142. if not order:
  143. return
  144. # if ticket.type == "refund" and order.status == "refund_pending":
  145. # order.status = "paid"
  146. @staticmethod
  147. async def add_message(
  148. db: AsyncSession,
  149. ticket_id: int,
  150. sender_type: str, # "user" | "admin" | "system"
  151. sender_id: Optional[str] = None,
  152. content: str = "",
  153. attachments: Optional[dict] = None,
  154. ):
  155. ticket = await db.get(VasTicket, ticket_id)
  156. if not ticket:
  157. raise NotFoundError("Ticket not exist")
  158. message = VasTicketMessage(
  159. ticket_id=ticket_id,
  160. sender_type=sender_type,
  161. sender_id=sender_id,
  162. content=content,
  163. attachments=attachments,
  164. created_at=datetime.utcnow(),
  165. )
  166. db.add(message)
  167. ticket.updated_at = datetime.utcnow()
  168. await db.commit()
  169. await db.refresh(message)
  170. return message
  171. @staticmethod
  172. async def list_messages(
  173. db: AsyncSession,
  174. ticket_id: int,
  175. page: int = 1,
  176. size: int = 20,
  177. ):
  178. # 校验 ticket 是否存在
  179. exists = await db.scalar(
  180. select(VasTicket.id).where(VasTicket.id == ticket_id)
  181. )
  182. if not exists:
  183. raise NotFoundError("Ticket not exist")
  184. stmt = (
  185. select(VasTicketMessage)
  186. .where(VasTicketMessage.ticket_id == ticket_id)
  187. .order_by(VasTicketMessage.created_at.desc())
  188. )
  189. return await paginate(db, stmt, page, size)
  190. @staticmethod
  191. async def list_by_user(
  192. db: AsyncSession,
  193. user_id: str,
  194. page: int = 1,
  195. size: int = 20,
  196. keyword: Optional[str] = None,
  197. ):
  198. stmt = select(VasTicket).where(VasTicket.user_id == user_id)
  199. stmt = apply_keyword_search_stmt(
  200. stmt=stmt,
  201. model=VasTicket,
  202. keyword=keyword,
  203. fields=["order_id", "user_id", "reason", "admin_comment"],
  204. )
  205. stmt = stmt.order_by(VasTicket.id.desc())
  206. return await paginate(db, stmt, page, size)
  207. @staticmethod
  208. async def list_all(
  209. db: AsyncSession,
  210. page: int = 1,
  211. size: int = 20,
  212. keyword: Optional[str] = None,
  213. ):
  214. stmt = select(VasTicket)
  215. stmt = apply_keyword_search_stmt(
  216. stmt=stmt,
  217. model=VasTicket,
  218. keyword=keyword,
  219. fields=["order_id", "user_id", "reason", "admin_comment"],
  220. )
  221. stmt = stmt.order_by(VasTicket.id.desc())
  222. return await paginate(db, stmt, page, size)