# app/services/task_handlers.py import logging from datetime import datetime from typing import Callable, Awaitable, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.user import VasUser from app.models.vas_task import VasTask from app.models.order import VasOrder # 导入具体的业务服务 (用于发邮件等) from app.services.email_authorizations_service import EmailAuthorizationService logger = logging.getLogger(__name__) # 定义处理器函数的类型签名 HandlerFunc = Callable[[AsyncSession, VasTask, VasOrder], Awaitable[None]] class TaskHandlerRegistry: """ 任务后处理注册表 用于管理 routing_key 与处理函数的对应关系 """ def __init__(self): self._handlers: Dict[str, HandlerFunc] = {} def register(self, *routing_keys: str): """ 装饰器:将一个或多个 routing_key 注册到同一个处理函数 用法: @task_processor.register("key1", "key2") async def my_handler(...): pass """ def decorator(func: HandlerFunc): for key in routing_keys: if key in self._handlers: logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}") self._handlers[key] = func return func return decorator async def execute(self, routing_key: str, db: AsyncSession, task: VasTask, order: VasOrder): """ 根据 routing_key 查找并执行对应的处理函数 """ handler = self._handlers.get(routing_key) if not handler: # 如果没有注册处理器,通常直接跳过即可 logger.debug(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.") return try: logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})") # 执行具体的业务逻辑 await handler(db, task, order) logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.") except Exception as e: # 捕获异常,防止阻断主流程 (manual_confirm) 的事务提交 # 也可以选择在这里将错误写入 task.meta logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True) # 可选:记录错误到 Task Meta # await self._record_error(db, task, str(e)) async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str): """辅助方法:将错误信息回写到 Task (可选)""" try: meta = dict(task.meta) if task.meta else {} meta["post_process_error"] = error_msg task.meta = meta # 注意:这里不需要 commit,因为外层 manual_confirm 会统一 commit except Exception: pass # ================================================================= # 实例化注册表 (在其他地方 import 这个实例) # ================================================================= task_processor = TaskHandlerRegistry() # ================================================================= # 具体的处理函数 (Handlers) # ================================================================= @task_processor.register("auto.slot.dub.fr.tourist") async def forward_troov_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder): """ 处理邮件转发相关的任务 支持 routing_key: email.forward, email.auto_reply """ # 1. 解析参数 # 假设参数存储在 task.user_inputs 或 task.config 中 inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" forward_to = inputs.get("email", "") if not forward_to: uid = order.user_id stmt = select(VasUser).where(VasUser.id == uid) result = await db.execute(stmt) user = result.scalar_one_or_none() forward_to = user.email recipient = task_config.get('alias_email', "") first_name = inputs.get('first_name', '') last_name = inputs.get('last_name', '') first_name = first_name.capitalize() last_name = last_name.capitalize() book_date_str = grabbed_history.get("book_date", "") dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S") formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M") # 过滤条件 sender = "ne-pas-repondre at consulat.gouv.fr" subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande") body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}") if not email_account or not forward_to: logger.warning(f"Task {task.id} missing required inputs for email forwarding.") return logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}") # 2. 获取授权配置 auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email authorization not found for account: {email_account}") # 3. 调用邮件服务执行转发 # 这里复用你之前写的 Service 方法 result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) # 4. (可选) 将结果回写到 Task Meta meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta @task_processor.register("auto.slot.dub.nl.tourist") async def forward_vfs_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder): """ 处理邮件转发相关的任务 支持 routing_key: email.forward, email.auto_reply """ # 1. 解析参数 # 假设参数存储在 task.user_inputs 或 task.config 中 inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" forward_to = inputs.get("email", "") if not forward_to: uid = order.user_id stmt = select(VasUser).where(VasUser.id == uid) result = await db.execute(stmt) user = result.scalar_one_or_none() forward_to = user.email urn = grabbed_history.get("urn", "") # 过滤条件 sender = "donotreply at vfsglobal.com" recipient = task_config.get('alias_email', "") subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm") body_keywords = inputs.get("bodyKeywords", f"urn") if not email_account or not forward_to: logger.warning(f"Task {task.id} missing required inputs for email forwarding.") return logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}") # 2. 获取授权配置 auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email authorization not found for account: {email_account}") # 3. 调用邮件服务执行转发 # 这里复用你之前写的 Service 方法 result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) # 4. (可选) 将结果回写到 Task Meta meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta @task_processor.register("auto.slot.dub.de.tourist") async def forward_visametric_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder): """ 处理邮件转发相关的任务 支持 routing_key: email.forward, email.auto_reply """ # 1. 解析参数 # 假设参数存储在 task.user_inputs 或 task.config 中 inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" forward_to = inputs.get("email", "") if not forward_to: uid = order.user_id stmt = select(VasUser).where(VasUser.id == uid) result = await db.execute(stmt) user = result.scalar_one_or_none() forward_to = user.email first_name = inputs.get('first_name', '') last_name = inputs.get('last_name', '') pnr_number = grabbed_history.get('pnr_number', '') slot_date = grabbed_history.get('slot_date', '') slot_time = grabbed_history.get('slot_time', '') # 过滤条件 sender = "noreply at visametric.com" recipient = task_config.get("alias_email", "") subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request") body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}") if not email_account or not forward_to: logger.warning(f"Task {task.id} missing required inputs for email forwarding.") return logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}") # 2. 获取授权配置 auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email authorization not found for account: {email_account}") # 3. 调用邮件服务执行转发 # 这里复用你之前写的 Service 方法 result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) # 4. (可选) 将结果回写到 Task Meta meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta