# app/services/task_handlers.py import logging from datetime import datetime from typing import Callable, Awaitable, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from redis.asyncio import Redis from app.models.user import VasUser from app.models.vas_task import VasTask from app.models.order import VasOrder from app.services.notification_service import NotificationService from app.services.email_authorizations_service import EmailAuthorizationService logger = logging.getLogger(__name__) # 1. 修改 HandlerFunc 类型签名,增加 redis_client: Redis HandlerFunc = Callable[[AsyncSession, Redis, VasTask, VasOrder], Awaitable[None]] class TaskHandlerRegistry: """ 任务后处理注册表 用于管理 routing_key 与处理函数的对应关系 """ def __init__(self): self._handlers: Dict[str, HandlerFunc] = {} def register(self, *routing_keys: str): def decorator(func: HandlerFunc): for key in routing_keys: if key in self._handlers: logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}") self._handlers[key] = func return func return decorator # 2. 修改 execute 方法,接收 redis_client async def execute(self, routing_key: str, db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder): """ 根据 routing_key 查找并执行对应的处理函数 """ handler = self._handlers.get(routing_key) if not handler: logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.") return try: logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})") # 传递 redis_client 给具体处理函数 await handler(db, redis_client, task, order) logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.") except Exception as e: logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True) # await self._record_error(db, task, str(e)) async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str): try: meta = dict(task.meta) if task.meta else {} meta["post_process_error"] = error_msg task.meta = meta except Exception: pass task_processor = TaskHandlerRegistry() # ================================================================= # 具体的处理函数 (Handlers) # ================================================================= # 辅助函数:获取用户信息(用于发通知) async def _get_user_by_id(db: AsyncSession, user_id: int) -> VasUser: stmt = select(VasUser).where(VasUser.id == user_id) result = await db.execute(stmt) return result.scalar_one_or_none() @task_processor.register("auto.slot.dub.fr.tourist") async def forward_troov_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder): """ Troov (法国) 预约信转发及通知 """ # 1. 解析参数 inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" # 获取用户以发送通知 user = await _get_user_by_id(db, order.user_id) forward_to = inputs.get("email") or (user.email if user else "") recipient = task_config.get('alias_email', "") first_name = inputs.get('first_name', '').capitalize() last_name = inputs.get('last_name', '').capitalize() # 处理日期格式用于关键词匹配 book_date_str = grabbed_history.get("book_date", "") formatted_date_time = "" if book_date_str: try: # 假设 input 是 ISO 格式 "2024-03-15T09:00:00" dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S") formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M") # Troov 邮件中的特殊格式 # 为邮件通知准备通用格式 notification_date_str = dt.strftime("%d %b %Y, %H:%M") except ValueError: formatted_date_time = book_date_str notification_date_str = book_date_str # 过滤条件 sender = "ne-pas-repondre at consulat.gouv.fr" # 注意实际匹配时可能需要处理 @ 符号 subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande") body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}") if not email_account or not forward_to: logger.warning(f"Task {task.id} missing inputs for email forwarding.") return logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}") # 2. 获取授权并转发邮件 auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email authorization not found: {email_account}") result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) # 3. 更新 Task Meta meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis) if user and user.email: logger.info(f"📧 Sending appointment confirmation email to {user.email}") await NotificationService.post_message( db=db, channel="email", payload={ "template_id": "appointment_confirmation", # 对应之前定义的模板ID "receiver": user.email, "payload": { "username": user.nickname or first_name, "order_id": str(order.id), "country": "France", "city": "Dublin", # 根据 routing_key 或 inputs 推断 "appointment_date": formatted_date_time, "visa_type": "Short Stay Any Purpose", "user_email": forward_to } }, ) @task_processor.register("auto.slot.dub.nl.tourist") async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder): """ VFS (荷兰) 预约信转发及通知 """ inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" user = await _get_user_by_id(db, order.user_id) forward_to = inputs.get("email") or (user.email if user else "") urn = grabbed_history.get("urn", "") sender = "donotreply at vfsglobal.com" recipient = task_config.get('alias_email', "") subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm") body_keywords = inputs.get("bodyKeywords", f"urn") # VFS 通常用 URN 匹配 if not email_account or not forward_to: logger.warning(f"Task {task.id} missing inputs.") return auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email auth not found: {email_account}") result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta # 发送通知 if user and user.email: await NotificationService.post_message( db=db, channel="email", payload={ "template_id": "appointment_confirmation", "receiver": user.email, "payload": { "username": user.nickname or 'consumer', "order_id": str(order.id), "country": "Netherlands", "city": "Dublin", "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history "visa_type": "Tourist", "user_email": forward_to } }, ) @task_processor.register("auto.slot.dub.de.tourist") async def forward_visametric_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder): """ Visametric (德国) 预约信转发及通知 """ inputs: Dict[str, Any] = task.user_inputs or {} grabbed_history = task.grabbed_history or {} task_config = task.config or {} email_account = "visafly666@gmail.com" user = await _get_user_by_id(db, order.user_id) forward_to = inputs.get("email") or (user.email if user else "") first_name = inputs.get('first_name', '') last_name = inputs.get('last_name', '') pnr_number = grabbed_history.get('pnr_number', '') slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD slot_time = grabbed_history.get('slot_time', '') sender = "noreply at visametric.com" recipient = task_config.get("alias_email", "") subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request") body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}") if not email_account or not forward_to: logger.warning(f"Task {task.id} missing inputs.") return auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email auth not found: {email_account}") result = await EmailAuthorizationService.forward_first_matching_email2( db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) meta = dict(task.meta) if task.meta else {} meta["forward_result"] = result task.meta = meta # 发送通知 if user and user.email: display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed" await NotificationService.post_message( db=db, channel="email", payload={ "template_id": "appointment_confirmation", "receiver": user.email, "payload": { "username": user.nickname or first_name, "order_id": str(order.id), "country": "Germany", "city": "Dublin", "appointment_date": display_date, "visa_type": "Tourist", "user_email": forward_to } }, )