ticket_service.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. from datetime import datetime
  2. from typing import List, Optional
  3. from redis.asyncio import Redis
  4. from sqlalchemy import select
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from app.utils.search import apply_keyword_search_stmt
  7. from app.utils.pagination import paginate
  8. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  9. from app.models.user import VasUser
  10. from app.models.order import VasOrder
  11. from app.models.vas_task import VasTask
  12. from app.models.payment import VasPayment
  13. from app.models.ticket import VasTicket
  14. from app.models.ticket_message import VasTicketMessage
  15. from app.schemas.ticket import VasTicketCreate
  16. from app.services.notification_service import NotificationService
  17. class TicketService:
  18. @staticmethod
  19. async def create(
  20. db: AsyncSession,
  21. data: VasTicketCreate,
  22. current_user: VasUser,
  23. redis_client: Redis
  24. ):
  25. ticket = VasTicket(
  26. **data.dict(),
  27. user_id=current_user.id,
  28. status="pending",
  29. created_at=datetime.utcnow(),
  30. updated_at=datetime.utcnow(),
  31. )
  32. db.add(ticket)
  33. await db.commit()
  34. await db.refresh(ticket)
  35. await NotificationService.create(
  36. redis_client=redis_client,
  37. ntype="ticket created",
  38. user_id=current_user.id,
  39. channels=["email"],
  40. template_id="ticket_created",
  41. payload={
  42. "ticket_id": ticket.id,
  43. },
  44. )
  45. return ticket
  46. @staticmethod
  47. async def update_status(
  48. db: AsyncSession,
  49. ticket_id: int,
  50. status: str,
  51. comment: str,
  52. admin_id: str,
  53. ) -> VasTicket:
  54. # 🔒 锁住 ticket,防并发管理员操作
  55. result = await db.execute(
  56. select(VasTicket)
  57. .where(VasTicket.id == ticket_id)
  58. .with_for_update()
  59. )
  60. ticket = result.scalar_one_or_none()
  61. if not ticket:
  62. raise NotFoundError("Ticket not exist")
  63. ticket.status = status
  64. ticket.admin_comment = comment
  65. ticket.updated_at = datetime.utcnow()
  66. db.add(
  67. VasTicketMessage(
  68. ticket_id=ticket.id,
  69. sender_type="admin",
  70. sender_id=str(admin_id),
  71. content=comment,
  72. created_at=datetime.utcnow(),
  73. )
  74. )
  75. if status == "resolved":
  76. await TicketService._handle_resolution(db, ticket)
  77. elif status == "rejected":
  78. await TicketService._handle_rejection(db, ticket)
  79. await db.commit()
  80. await db.refresh(ticket)
  81. return ticket
  82. # =========================
  83. # 工单解决逻辑
  84. # =========================
  85. @staticmethod
  86. async def _handle_resolution(
  87. db: AsyncSession,
  88. ticket: VasTicket,
  89. ) -> None:
  90. if not ticket.order_id:
  91. return
  92. result = await db.execute(
  93. select(VasOrder)
  94. .where(VasOrder.id == ticket.order_id)
  95. .with_for_update()
  96. )
  97. order = result.scalar_one_or_none()
  98. if not order:
  99. return
  100. # ---------- 退款 ----------
  101. if ticket.type == "refund":
  102. order.status = "closed"
  103. pay_res = await db.execute(
  104. select(VasPayment).where(
  105. VasPayment.order_id == order.id,
  106. VasPayment.status.in_(["succeeded", "late_paid"]),
  107. )
  108. )
  109. payment = pay_res.scalar_one_or_none()
  110. if payment:
  111. payment.status = "refunded"
  112. payment.refunded_at = datetime.utcnow()
  113. payment.refund_reason = ticket.reason
  114. task_res = await db.execute(
  115. select(VasTask).where(
  116. VasTask.order_id == order.id,
  117. VasTask.status.in_(["pending", "grabbed", "running"]),
  118. )
  119. )
  120. for task in task_res.scalars().all():
  121. task.status = "cancelled"
  122. # ---------- 变更请求 ----------
  123. elif ticket.type == "change_request":
  124. # 1️⃣ 取消旧任务
  125. task_res = await db.execute(
  126. select(VasTask).where(
  127. VasTask.order_id == order.id,
  128. VasTask.status.in_(["pending", "grabbed", "running"]),
  129. )
  130. )
  131. for task in task_res.scalars().all():
  132. task.status = "cancelled"
  133. # 2️⃣ 重新生成任务(幂等)
  134. routing_res = await db.execute(
  135. select(VasProductRouting).where(
  136. VasProductRouting.product_id == order.product_id,
  137. VasProductRouting.is_active == 1,
  138. )
  139. )
  140. routings = routing_res.scalars().all()
  141. for routing in routings:
  142. exists_res = await db.execute(
  143. select(VasTask).where(
  144. VasTask.order_id == order.id,
  145. VasTask.routing_key == routing.routing_key,
  146. VasTask.script_version == routing.script_version,
  147. )
  148. )
  149. if exists_res.scalar_one_or_none():
  150. continue
  151. db.add(
  152. VasTask(
  153. order_id=order.id,
  154. routing_key=routing.routing_key,
  155. script_version=routing.script_version,
  156. priority=10,
  157. status="pending",
  158. user_inputs=order.user_inputs,
  159. config=routing.config,
  160. attempt_count=0,
  161. notify_count=0,
  162. expire_at=datetime.utcnow() + timedelta(days=60),
  163. created_at=datetime.utcnow(),
  164. )
  165. )
  166. # =========================
  167. # 工单拒绝逻辑
  168. # =========================
  169. @staticmethod
  170. async def _handle_rejection(
  171. db: AsyncSession,
  172. ticket: VasTicket,
  173. ) -> None:
  174. if not ticket.order_id:
  175. return
  176. result = await db.execute(
  177. select(VasOrder)
  178. .where(VasOrder.id == ticket.order_id)
  179. .with_for_update()
  180. )
  181. order = result.scalar_one_or_none()
  182. if not order:
  183. return
  184. if ticket.type == "refund" and order.status == "refund_pending":
  185. order.status = "paid"
  186. @staticmethod
  187. async def add_message(
  188. db: AsyncSession,
  189. ticket_id: int,
  190. sender_type: str, # "user" | "admin" | "system"
  191. sender_id: Optional[str] = None,
  192. content: str = "",
  193. attachments: Optional[dict] = None,
  194. ):
  195. ticket = await db.get(VasTicket, ticket_id)
  196. if not ticket:
  197. raise NotFoundError("Ticket not exist")
  198. message = VasTicketMessage(
  199. ticket_id=ticket_id,
  200. sender_type=sender_type,
  201. sender_id=sender_id,
  202. content=content,
  203. attachments=attachments,
  204. created_at=datetime.utcnow(),
  205. )
  206. db.add(message)
  207. ticket.updated_at = datetime.utcnow()
  208. await db.commit()
  209. await db.refresh(message)
  210. return message
  211. @staticmethod
  212. async def list_messages(
  213. db: AsyncSession,
  214. ticket_id: int,
  215. page: int = 1,
  216. size: int = 20,
  217. ):
  218. # 校验 ticket 是否存在
  219. exists = await db.scalar(
  220. select(VasTicket.id).where(VasTicket.id == ticket_id)
  221. )
  222. if not exists:
  223. raise NotFoundError("Ticket not exist")
  224. stmt = (
  225. select(VasTicketMessage)
  226. .where(VasTicketMessage.ticket_id == ticket_id)
  227. .order_by(VasTicketMessage.created_at.desc())
  228. )
  229. return await paginate(db, stmt, page, size)
  230. @staticmethod
  231. async def list_by_user(
  232. db: AsyncSession,
  233. user_id: str,
  234. page: int = 1,
  235. size: int = 20,
  236. keyword: Optional[str] = None,
  237. ):
  238. stmt = select(VasTicket).where(VasTicket.user_id == user_id)
  239. stmt = apply_keyword_search_stmt(
  240. stmt=stmt,
  241. model=VasTicket,
  242. keyword=keyword,
  243. fields=["order_id", "user_id", "reason", "admin_comment"],
  244. )
  245. stmt = stmt.order_by(VasTicket.id.desc())
  246. return await paginate(db, stmt, page, size)
  247. @staticmethod
  248. async def list_all(
  249. db: AsyncSession,
  250. page: int = 1,
  251. size: int = 20,
  252. keyword: Optional[str] = None,
  253. ):
  254. stmt = select(VasTicket)
  255. stmt = apply_keyword_search_stmt(
  256. stmt=stmt,
  257. model=VasTicket,
  258. keyword=keyword,
  259. fields=["order_id", "user_id", "reason", "admin_comment"],
  260. )
  261. stmt = stmt.order_by(VasTicket.id.desc())
  262. return await paginate(db, stmt, page, size)