task_handlers.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # app/services/task_handlers.py
  2. import logging
  3. from datetime import datetime
  4. from typing import Callable, Awaitable, Dict, Any
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from sqlalchemy import select
  7. from redis.asyncio import Redis
  8. from app.models.user import VasUser
  9. from app.models.vas_task import VasTask
  10. from app.models.order import VasOrder
  11. from app.services.notification_service import NotificationService
  12. from app.services.email_authorizations_service import EmailAuthorizationService
  13. logger = logging.getLogger(__name__)
  14. # 1. 修改 HandlerFunc 类型签名,增加 redis_client: Redis
  15. HandlerFunc = Callable[[AsyncSession, Redis, VasTask, VasOrder], Awaitable[None]]
  16. class TaskHandlerRegistry:
  17. """
  18. 任务后处理注册表
  19. 用于管理 routing_key 与处理函数的对应关系
  20. """
  21. def __init__(self):
  22. self._handlers: Dict[str, HandlerFunc] = {}
  23. def register(self, *routing_keys: str):
  24. def decorator(func: HandlerFunc):
  25. for key in routing_keys:
  26. if key in self._handlers:
  27. logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
  28. self._handlers[key] = func
  29. return func
  30. return decorator
  31. # 2. 修改 execute 方法,接收 redis_client
  32. async def execute(self, routing_key: str, db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  33. """
  34. 根据 routing_key 查找并执行对应的处理函数
  35. """
  36. handler = self._handlers.get(routing_key)
  37. if not handler:
  38. logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
  39. return
  40. try:
  41. logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
  42. # 传递 redis_client 给具体处理函数
  43. await handler(db, redis_client, task, order)
  44. logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
  45. except Exception as e:
  46. logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
  47. # await self._record_error(db, task, str(e))
  48. async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
  49. try:
  50. meta = dict(task.meta) if task.meta else {}
  51. meta["post_process_error"] = error_msg
  52. task.meta = meta
  53. except Exception:
  54. pass
  55. task_processor = TaskHandlerRegistry()
  56. # =================================================================
  57. # 具体的处理函数 (Handlers)
  58. # =================================================================
  59. # 辅助函数:获取用户信息(用于发通知)
  60. async def _get_user_by_id(db: AsyncSession, user_id: int) -> VasUser:
  61. stmt = select(VasUser).where(VasUser.id == user_id)
  62. result = await db.execute(stmt)
  63. return result.scalar_one_or_none()
  64. @task_processor.register("auto.slot.dub.fr.tourist")
  65. async def forward_troov_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  66. """
  67. Troov (法国) 预约信转发及通知
  68. """
  69. # 1. 解析参数
  70. inputs: Dict[str, Any] = task.user_inputs or {}
  71. grabbed_history = task.grabbed_history or {}
  72. task_config = task.config or {}
  73. email_account = "visafly666@gmail.com"
  74. # 获取用户以发送通知
  75. user = await _get_user_by_id(db, order.user_id)
  76. forward_to = inputs.get("email") or (user.email if user else "")
  77. recipient = task_config.get('alias_email', "")
  78. first_name = inputs.get('first_name', '').capitalize()
  79. last_name = inputs.get('last_name', '').capitalize()
  80. # 处理日期格式用于关键词匹配
  81. book_date_str = grabbed_history.get("book_date", "")
  82. formatted_date_time = ""
  83. if book_date_str:
  84. try:
  85. # 假设 input 是 ISO 格式 "2024-03-15T09:00:00"
  86. dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
  87. formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M") # Troov 邮件中的特殊格式
  88. # 为邮件通知准备通用格式
  89. notification_date_str = dt.strftime("%d %b %Y, %H:%M")
  90. except ValueError:
  91. formatted_date_time = book_date_str
  92. notification_date_str = book_date_str
  93. # 过滤条件
  94. sender = "ne-pas-repondre at consulat.gouv.fr" # 注意实际匹配时可能需要处理 @ 符号
  95. subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
  96. body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
  97. if not email_account or not forward_to:
  98. logger.warning(f"Task {task.id} missing inputs for email forwarding.")
  99. return
  100. logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
  101. # 2. 获取授权并转发邮件
  102. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  103. if not auth:
  104. raise ValueError(f"Email authorization not found: {email_account}")
  105. result = await EmailAuthorizationService.forward_first_matching_email2(
  106. db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
  107. subject_keywords=subject_keywords, body_keywords=body_keywords
  108. )
  109. # 3. 更新 Task Meta
  110. meta = dict(task.meta) if task.meta else {}
  111. meta["forward_result"] = result
  112. task.meta = meta
  113. # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis)
  114. if user and user.email:
  115. logger.info(f"📧 Sending appointment confirmation email to {user.email}")
  116. await NotificationService.post_email(
  117. redis_client=redis_client,
  118. receiver=user.email,
  119. template_id="appointment_confirmation", # 对应之前定义的模板ID
  120. payload={
  121. "username": user.nickname or first_name,
  122. "order_id": str(order.id),
  123. "country": "France",
  124. "city": "Dublin", # 根据 routing_key 或 inputs 推断
  125. "appointment_date": formatted_date_time,
  126. "visa_type": "Short Stay Any Purpose",
  127. "user_email": forward_to
  128. }
  129. )
  130. @task_processor.register("auto.slot.dub.nl.tourist")
  131. async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  132. """
  133. VFS (荷兰) 预约信转发及通知
  134. """
  135. inputs: Dict[str, Any] = task.user_inputs or {}
  136. grabbed_history = task.grabbed_history or {}
  137. task_config = task.config or {}
  138. email_account = "visafly666@gmail.com"
  139. user = await _get_user_by_id(db, order.user_id)
  140. forward_to = inputs.get("email") or (user.email if user else "")
  141. urn = grabbed_history.get("urn", "")
  142. sender = "donotreply at vfsglobal.com"
  143. recipient = task_config.get('alias_email', "")
  144. subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
  145. body_keywords = inputs.get("bodyKeywords", f"urn") # VFS 通常用 URN 匹配
  146. if not email_account or not forward_to:
  147. logger.warning(f"Task {task.id} missing inputs.")
  148. return
  149. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  150. if not auth:
  151. raise ValueError(f"Email auth not found: {email_account}")
  152. result = await EmailAuthorizationService.forward_first_matching_email2(
  153. db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
  154. subject_keywords=subject_keywords, body_keywords=body_keywords
  155. )
  156. meta = dict(task.meta) if task.meta else {}
  157. meta["forward_result"] = result
  158. task.meta = meta
  159. # 发送通知
  160. if user and user.email:
  161. await NotificationService.post_email(
  162. redis_client=redis_client,
  163. receiver=user.email,
  164. template_id="appointment_confirmation",
  165. payload={
  166. "username": user.nickname or 'consumer',
  167. "order_id": str(order.id),
  168. "country": "Netherlands",
  169. "city": "Dublin",
  170. "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
  171. "visa_type": "Tourist",
  172. "user_email": forward_to
  173. }
  174. )
  175. @task_processor.register("auto.slot.dub.de.tourist")
  176. async def forward_visametric_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
  177. """
  178. Visametric (德国) 预约信转发及通知
  179. """
  180. inputs: Dict[str, Any] = task.user_inputs or {}
  181. grabbed_history = task.grabbed_history or {}
  182. task_config = task.config or {}
  183. email_account = "visafly666@gmail.com"
  184. user = await _get_user_by_id(db, order.user_id)
  185. forward_to = inputs.get("email") or (user.email if user else "")
  186. first_name = inputs.get('first_name', '')
  187. last_name = inputs.get('last_name', '')
  188. pnr_number = grabbed_history.get('pnr_number', '')
  189. slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
  190. slot_time = grabbed_history.get('slot_time', '')
  191. sender = "noreply at visametric.com"
  192. recipient = task_config.get("alias_email", "")
  193. subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
  194. body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
  195. if not email_account or not forward_to:
  196. logger.warning(f"Task {task.id} missing inputs.")
  197. return
  198. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  199. if not auth:
  200. raise ValueError(f"Email auth not found: {email_account}")
  201. result = await EmailAuthorizationService.forward_first_matching_email2(
  202. db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
  203. subject_keywords=subject_keywords, body_keywords=body_keywords
  204. )
  205. meta = dict(task.meta) if task.meta else {}
  206. meta["forward_result"] = result
  207. task.meta = meta
  208. # 发送通知
  209. if user and user.email:
  210. display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
  211. await NotificationService.post_email(
  212. redis_client=redis_client,
  213. receiver=user.email,
  214. template_id="appointment_confirmation",
  215. payload={
  216. "username": user.nickname or first_name,
  217. "order_id": str(order.id),
  218. "country": "Germany",
  219. "city": "Dublin",
  220. "appointment_date": display_date,
  221. "visa_type": "Tourist",
  222. "user_email": forward_to
  223. }
  224. )