task_handlers.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. # app/services/task_handlers.py
  2. import logging
  3. from datetime import datetime
  4. from typing import Callable, Awaitable, Dict, Any
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from sqlalchemy import select
  7. from app.models.user import VasUser
  8. from app.models.vas_task import VasTask
  9. from app.models.order import VasOrder
  10. # 导入具体的业务服务 (用于发邮件等)
  11. from app.services.email_authorizations_service import EmailAuthorizationService
  12. logger = logging.getLogger(__name__)
  13. # 定义处理器函数的类型签名
  14. HandlerFunc = Callable[[AsyncSession, VasTask, VasOrder], Awaitable[None]]
  15. class TaskHandlerRegistry:
  16. """
  17. 任务后处理注册表
  18. 用于管理 routing_key 与处理函数的对应关系
  19. """
  20. def __init__(self):
  21. self._handlers: Dict[str, HandlerFunc] = {}
  22. def register(self, *routing_keys: str):
  23. """
  24. 装饰器:将一个或多个 routing_key 注册到同一个处理函数
  25. 用法:
  26. @task_processor.register("key1", "key2")
  27. async def my_handler(...):
  28. pass
  29. """
  30. def decorator(func: HandlerFunc):
  31. for key in routing_keys:
  32. if key in self._handlers:
  33. logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
  34. self._handlers[key] = func
  35. return func
  36. return decorator
  37. async def execute(self, routing_key: str, db: AsyncSession, task: VasTask, order: VasOrder):
  38. """
  39. 根据 routing_key 查找并执行对应的处理函数
  40. """
  41. handler = self._handlers.get(routing_key)
  42. if not handler:
  43. # 如果没有注册处理器,通常直接跳过即可
  44. logger.debug(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
  45. return
  46. try:
  47. logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
  48. # 执行具体的业务逻辑
  49. await handler(db, task, order)
  50. logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
  51. except Exception as e:
  52. # 捕获异常,防止阻断主流程 (manual_confirm) 的事务提交
  53. # 也可以选择在这里将错误写入 task.meta
  54. logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
  55. # 可选:记录错误到 Task Meta
  56. # await self._record_error(db, task, str(e))
  57. async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
  58. """辅助方法:将错误信息回写到 Task (可选)"""
  59. try:
  60. meta = dict(task.meta) if task.meta else {}
  61. meta["post_process_error"] = error_msg
  62. task.meta = meta
  63. # 注意:这里不需要 commit,因为外层 manual_confirm 会统一 commit
  64. except Exception:
  65. pass
  66. # =================================================================
  67. # 实例化注册表 (在其他地方 import 这个实例)
  68. # =================================================================
  69. task_processor = TaskHandlerRegistry()
  70. # =================================================================
  71. # 具体的处理函数 (Handlers)
  72. # =================================================================
  73. @task_processor.register("auto.slot.dub.fr.tourist")
  74. async def forward_troov_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
  75. """
  76. 处理邮件转发相关的任务
  77. 支持 routing_key: email.forward, email.auto_reply
  78. """
  79. # 1. 解析参数
  80. # 假设参数存储在 task.user_inputs 或 task.config 中
  81. inputs: Dict[str, Any] = task.user_inputs or {}
  82. grabbed_history = task.grabbed_history or {}
  83. task_config = task.config or {}
  84. email_account = "visafly666@gmail.com"
  85. forward_to = inputs.get("email", "")
  86. if not forward_to:
  87. uid = order.user_id
  88. stmt = select(VasUser).where(VasUser.id == uid)
  89. result = await db.execute(stmt)
  90. user = result.scalar_one_or_none()
  91. forward_to = user.email
  92. recipient = task_config.get('alias_email', "")
  93. first_name = inputs.get('first_name', '')
  94. last_name = inputs.get('last_name', '')
  95. first_name = first_name.capitalize()
  96. last_name = last_name.capitalize()
  97. book_date_str = grabbed_history.get("book_date", "")
  98. dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
  99. formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M")
  100. # 过滤条件
  101. sender = "ne-pas-repondre at consulat.gouv.fr"
  102. subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
  103. body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
  104. if not email_account or not forward_to:
  105. logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
  106. return
  107. logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
  108. # 2. 获取授权配置
  109. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  110. if not auth:
  111. raise ValueError(f"Email authorization not found for account: {email_account}")
  112. # 3. 调用邮件服务执行转发
  113. # 这里复用你之前写的 Service 方法
  114. result = await EmailAuthorizationService.forward_first_matching_email2(
  115. db=db,
  116. auth=auth,
  117. forward_to=forward_to,
  118. sender=sender,
  119. recipient=recipient,
  120. subject_keywords=subject_keywords,
  121. body_keywords=body_keywords
  122. )
  123. # 4. (可选) 将结果回写到 Task Meta
  124. meta = dict(task.meta) if task.meta else {}
  125. meta["forward_result"] = result
  126. task.meta = meta
  127. @task_processor.register("auto.slot.dub.nl.tourist")
  128. async def forward_vfs_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
  129. """
  130. 处理邮件转发相关的任务
  131. 支持 routing_key: email.forward, email.auto_reply
  132. """
  133. # 1. 解析参数
  134. # 假设参数存储在 task.user_inputs 或 task.config 中
  135. inputs: Dict[str, Any] = task.user_inputs or {}
  136. grabbed_history = task.grabbed_history or {}
  137. task_config = task.config or {}
  138. email_account = "visafly666@gmail.com"
  139. forward_to = inputs.get("email", "")
  140. if not forward_to:
  141. uid = order.user_id
  142. stmt = select(VasUser).where(VasUser.id == uid)
  143. result = await db.execute(stmt)
  144. user = result.scalar_one_or_none()
  145. forward_to = user.email
  146. urn = grabbed_history.get("urn", "")
  147. # 过滤条件
  148. sender = "donotreply at vfsglobal.com"
  149. recipient = task_config.get('alias_email', "")
  150. subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
  151. body_keywords = inputs.get("bodyKeywords", f"urn")
  152. if not email_account or not forward_to:
  153. logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
  154. return
  155. logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
  156. # 2. 获取授权配置
  157. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  158. if not auth:
  159. raise ValueError(f"Email authorization not found for account: {email_account}")
  160. # 3. 调用邮件服务执行转发
  161. # 这里复用你之前写的 Service 方法
  162. result = await EmailAuthorizationService.forward_first_matching_email2(
  163. db=db,
  164. auth=auth,
  165. forward_to=forward_to,
  166. sender=sender,
  167. recipient=recipient,
  168. subject_keywords=subject_keywords,
  169. body_keywords=body_keywords
  170. )
  171. # 4. (可选) 将结果回写到 Task Meta
  172. meta = dict(task.meta) if task.meta else {}
  173. meta["forward_result"] = result
  174. task.meta = meta
  175. @task_processor.register("auto.slot.dub.de.tourist")
  176. async def forward_visametric_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
  177. """
  178. 处理邮件转发相关的任务
  179. 支持 routing_key: email.forward, email.auto_reply
  180. """
  181. # 1. 解析参数
  182. # 假设参数存储在 task.user_inputs 或 task.config 中
  183. inputs: Dict[str, Any] = task.user_inputs or {}
  184. grabbed_history = task.grabbed_history or {}
  185. task_config = task.config or {}
  186. email_account = "visafly666@gmail.com"
  187. forward_to = inputs.get("email", "")
  188. if not forward_to:
  189. uid = order.user_id
  190. stmt = select(VasUser).where(VasUser.id == uid)
  191. result = await db.execute(stmt)
  192. user = result.scalar_one_or_none()
  193. forward_to = user.email
  194. first_name = inputs.get('first_name', '')
  195. last_name = inputs.get('last_name', '')
  196. pnr_number = grabbed_history.get('pnr_number', '')
  197. slot_date = grabbed_history.get('slot_date', '')
  198. slot_time = grabbed_history.get('slot_time', '')
  199. # 过滤条件
  200. sender = "noreply at visametric.com"
  201. recipient = task_config.get("alias_email", "")
  202. subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
  203. body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
  204. if not email_account or not forward_to:
  205. logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
  206. return
  207. logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
  208. # 2. 获取授权配置
  209. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  210. if not auth:
  211. raise ValueError(f"Email authorization not found for account: {email_account}")
  212. # 3. 调用邮件服务执行转发
  213. # 这里复用你之前写的 Service 方法
  214. result = await EmailAuthorizationService.forward_first_matching_email2(
  215. db=db,
  216. auth=auth,
  217. forward_to=forward_to,
  218. sender=sender,
  219. recipient=recipient,
  220. subject_keywords=subject_keywords,
  221. body_keywords=body_keywords
  222. )
  223. # 4. (可选) 将结果回写到 Task Meta
  224. meta = dict(task.meta) if task.meta else {}
  225. meta["forward_result"] = result
  226. task.meta = meta