task_handlers.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # app/services/task_handlers.py
  2. import logging
  3. from datetime import datetime
  4. from typing import Callable, Awaitable, Dict, Any
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from sqlalchemy import select
  7. from redis.asyncio import Redis
  8. from app.models.user import VasUser
  9. from app.models.vas_task import VasTask
  10. from app.models.order import VasOrder
  11. from app.services.notification_service import NotificationService
  12. from app.services.email_authorizations_service import EmailAuthorizationService
  13. logger = logging.getLogger(__name__)
  14. # 1. 修改 HandlerFunc 类型签名,增加 redis_client: Redis
  15. HandlerFunc = Callable[[AsyncSession, Redis, VasTask, VasOrder], Awaitable[None]]
  16. class TaskHandlerRegistry:
  17. """
  18. 任务后处理注册表
  19. 用于管理 routing_key 与处理函数的对应关系
  20. """
  21. def __init__(self):
  22. self._handlers: Dict[str, HandlerFunc] = {}
  23. def register(self, *routing_keys: str):
  24. def decorator(func: HandlerFunc):
  25. for key in routing_keys:
  26. if key in self._handlers:
  27. logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
  28. self._handlers[key] = func
  29. return func
  30. return decorator
  31. # 2. 修改 execute 方法,接收 redis_client
  32. async def execute(self, routing_key: str, db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  33. """
  34. 根据 routing_key 查找并执行对应的处理函数
  35. """
  36. handler = self._handlers.get(routing_key)
  37. if not handler:
  38. logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
  39. return
  40. try:
  41. logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
  42. # 传递 redis_client 给具体处理函数
  43. await handler(db, redis_client, task, order)
  44. logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
  45. except Exception as e:
  46. logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
  47. # await self._record_error(db, task, str(e))
  48. async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
  49. try:
  50. meta = dict(task.meta) if task.meta else {}
  51. meta["post_process_error"] = error_msg
  52. task.meta = meta
  53. except Exception:
  54. pass
  55. task_processor = TaskHandlerRegistry()
  56. # =================================================================
  57. # 具体的处理函数 (Handlers)
  58. # =================================================================
  59. # 辅助函数:获取用户信息(用于发通知)
  60. async def _get_user_by_id(db: AsyncSession, user_id: int) -> VasUser:
  61. stmt = select(VasUser).where(VasUser.id == user_id)
  62. result = await db.execute(stmt)
  63. return result.scalar_one_or_none()
  64. @task_processor.register("auto.slot.dub.nl.tourist")
  65. async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  66. """
  67. VFS (荷兰) 预约信转发及通知
  68. """
  69. inputs: Dict[str, Any] = task.user_inputs or {}
  70. grabbed_history = task.grabbed_history or {}
  71. task_config = task.config or {}
  72. email_account = "hujiarui8@gmail.com"
  73. user = await _get_user_by_id(db, order.user_id)
  74. forward_to = inputs.get("email") or (user.email if user else "")
  75. urn = grabbed_history.get("urn", "")
  76. sender = "donotreply at vfsglobal.com"
  77. recipient = task_config.get('alias_email', "")
  78. subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
  79. body_keywords = inputs.get("bodyKeywords", f"urn") # VFS 通常用 URN 匹配
  80. if not email_account or not forward_to:
  81. logger.warning(f"Task {task.id} missing inputs.")
  82. return
  83. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  84. if not auth:
  85. raise ValueError(f"Email auth not found: {email_account}")
  86. result = await EmailAuthorizationService.forward_first_matching_email2(
  87. db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
  88. subject_keywords=subject_keywords, body_keywords=body_keywords
  89. )
  90. meta = dict(task.meta) if task.meta else {}
  91. meta["forward_result"] = result
  92. task.meta = meta
  93. # 发送通知
  94. if user and user.email:
  95. await NotificationService.post_message(
  96. db=db,
  97. channel="email",
  98. payload={
  99. "template_id": "appointment_confirmation",
  100. "receiver": user.email,
  101. "payload": {
  102. "username": user.nickname or 'consumer',
  103. "order_id": str(order.id),
  104. "country": "Netherlands",
  105. "city": "Dublin",
  106. "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
  107. "visa_type": "Tourist",
  108. "user_email": forward_to
  109. }
  110. },
  111. )
  112. @task_processor.register("auto.slot.dub.de.tourist")
  113. async def forward_visametric_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  114. """
  115. Visametric (德国) 预约信转发及通知
  116. """
  117. inputs: Dict[str, Any] = task.user_inputs or {}
  118. grabbed_history = task.grabbed_history or {}
  119. task_config = task.config or {}
  120. email_account = "hujiarui8@gmail.com"
  121. user = await _get_user_by_id(db, order.user_id)
  122. forward_to = inputs.get("email") or (user.email if user else "")
  123. first_name = inputs.get('first_name', '')
  124. last_name = inputs.get('last_name', '')
  125. pnr_number = grabbed_history.get('pnr_number', '')
  126. slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
  127. slot_time = grabbed_history.get('slot_time', '')
  128. sender = "noreply at visametric.com"
  129. recipient = task_config.get("alias_email", "")
  130. subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
  131. body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
  132. if not email_account or not forward_to:
  133. logger.warning(f"Task {task.id} missing inputs.")
  134. return
  135. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  136. if not auth:
  137. raise ValueError(f"Email auth not found: {email_account}")
  138. result = await EmailAuthorizationService.forward_first_matching_email2(
  139. db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
  140. subject_keywords=subject_keywords, body_keywords=body_keywords
  141. )
  142. meta = dict(task.meta) if task.meta else {}
  143. meta["forward_result"] = result
  144. task.meta = meta
  145. # 发送通知
  146. if user and user.email:
  147. display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
  148. await NotificationService.post_message(
  149. db=db,
  150. channel="email",
  151. payload={
  152. "template_id": "appointment_confirmation",
  153. "receiver": user.email,
  154. "payload": {
  155. "username": user.nickname or first_name,
  156. "order_id": str(order.id),
  157. "country": "Germany",
  158. "city": "Dublin",
  159. "appointment_date": display_date,
  160. "visa_type": "Tourist",
  161. "user_email": forward_to
  162. }
  163. },
  164. )