| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- # app/services/task_handlers.py
- import logging
- from datetime import datetime
- from typing import Callable, Awaitable, Dict, Any
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.models.user import VasUser
- from app.models.vas_task import VasTask
- from app.models.order import VasOrder
- # 导入具体的业务服务 (用于发邮件等)
- from app.services.email_authorizations_service import EmailAuthorizationService
- logger = logging.getLogger(__name__)
- # 定义处理器函数的类型签名
- HandlerFunc = Callable[[AsyncSession, VasTask, VasOrder], Awaitable[None]]
- class TaskHandlerRegistry:
- """
- 任务后处理注册表
- 用于管理 routing_key 与处理函数的对应关系
- """
- def __init__(self):
- self._handlers: Dict[str, HandlerFunc] = {}
- def register(self, *routing_keys: str):
- """
- 装饰器:将一个或多个 routing_key 注册到同一个处理函数
-
- 用法:
- @task_processor.register("key1", "key2")
- async def my_handler(...):
- pass
- """
- def decorator(func: HandlerFunc):
- for key in routing_keys:
- if key in self._handlers:
- logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
- self._handlers[key] = func
- return func
- return decorator
- async def execute(self, routing_key: str, db: AsyncSession, task: VasTask, order: VasOrder):
- """
- 根据 routing_key 查找并执行对应的处理函数
- """
- handler = self._handlers.get(routing_key)
-
- if not handler:
- # 如果没有注册处理器,通常直接跳过即可
- logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
- return
- try:
- logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
-
- # 执行具体的业务逻辑
- await handler(db, task, order)
-
- logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
-
- except Exception as e:
- # 捕获异常,防止阻断主流程 (manual_confirm) 的事务提交
- # 也可以选择在这里将错误写入 task.meta
- logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
-
- # 可选:记录错误到 Task Meta
- # await self._record_error(db, task, str(e))
- async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
- """辅助方法:将错误信息回写到 Task (可选)"""
- try:
- meta = dict(task.meta) if task.meta else {}
- meta["post_process_error"] = error_msg
- task.meta = meta
- # 注意:这里不需要 commit,因为外层 manual_confirm 会统一 commit
- except Exception:
- pass
- # =================================================================
- # 实例化注册表 (在其他地方 import 这个实例)
- # =================================================================
- task_processor = TaskHandlerRegistry()
- # =================================================================
- # 具体的处理函数 (Handlers)
- # =================================================================
- @task_processor.register("auto.slot.dub.fr.tourist")
- async def forward_troov_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
- """
- 处理邮件转发相关的任务
- 支持 routing_key: email.forward, email.auto_reply
- """
- # 1. 解析参数
- # 假设参数存储在 task.user_inputs 或 task.config 中
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
-
- task_config = task.config or {}
-
- email_account = "visafly666@gmail.com"
- forward_to = inputs.get("email", "")
- if not forward_to:
- uid = order.user_id
- stmt = select(VasUser).where(VasUser.id == uid)
- result = await db.execute(stmt)
- user = result.scalar_one_or_none()
- forward_to = user.email
-
- recipient = task_config.get('alias_email', "")
-
- first_name = inputs.get('first_name', '')
- last_name = inputs.get('last_name', '')
-
- first_name = first_name.capitalize()
- last_name = last_name.capitalize()
-
- book_date_str = grabbed_history.get("book_date", "")
- dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
- formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M")
-
- # 过滤条件
- sender = "ne-pas-repondre at consulat.gouv.fr"
-
- subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
- body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
- return
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
- # 2. 获取授权配置
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email authorization not found for account: {email_account}")
- # 3. 调用邮件服务执行转发
- # 这里复用你之前写的 Service 方法
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db,
- auth=auth,
- forward_to=forward_to,
- sender=sender,
- recipient=recipient,
- subject_keywords=subject_keywords,
- body_keywords=body_keywords
- )
- # 4. (可选) 将结果回写到 Task Meta
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
-
-
- @task_processor.register("auto.slot.dub.nl.tourist")
- async def forward_vfs_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
- """
- 处理邮件转发相关的任务
- 支持 routing_key: email.forward, email.auto_reply
- """
- # 1. 解析参数
- # 假设参数存储在 task.user_inputs 或 task.config 中
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
-
- task_config = task.config or {}
-
- email_account = "visafly666@gmail.com"
- forward_to = inputs.get("email", "")
- if not forward_to:
- uid = order.user_id
- stmt = select(VasUser).where(VasUser.id == uid)
- result = await db.execute(stmt)
- user = result.scalar_one_or_none()
- forward_to = user.email
-
-
- urn = grabbed_history.get("urn", "")
-
- # 过滤条件
- sender = "donotreply at vfsglobal.com"
- recipient = task_config.get('alias_email', "")
- subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
- body_keywords = inputs.get("bodyKeywords", f"urn")
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
- return
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
- # 2. 获取授权配置
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email authorization not found for account: {email_account}")
- # 3. 调用邮件服务执行转发
- # 这里复用你之前写的 Service 方法
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db,
- auth=auth,
- forward_to=forward_to,
- sender=sender,
- recipient=recipient,
- subject_keywords=subject_keywords,
- body_keywords=body_keywords
- )
- # 4. (可选) 将结果回写到 Task Meta
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
-
- @task_processor.register("auto.slot.dub.de.tourist")
- async def forward_visametric_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
- """
- 处理邮件转发相关的任务
- 支持 routing_key: email.forward, email.auto_reply
- """
- # 1. 解析参数
- # 假设参数存储在 task.user_inputs 或 task.config 中
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
-
- task_config = task.config or {}
-
- email_account = "visafly666@gmail.com"
- forward_to = inputs.get("email", "")
- if not forward_to:
- uid = order.user_id
- stmt = select(VasUser).where(VasUser.id == uid)
- result = await db.execute(stmt)
- user = result.scalar_one_or_none()
- forward_to = user.email
-
- first_name = inputs.get('first_name', '')
- last_name = inputs.get('last_name', '')
-
- pnr_number = grabbed_history.get('pnr_number', '')
- slot_date = grabbed_history.get('slot_date', '')
- slot_time = grabbed_history.get('slot_time', '')
-
- # 过滤条件
- sender = "noreply at visametric.com"
- recipient = task_config.get("alias_email", "")
- subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
- body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
- return
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
- # 2. 获取授权配置
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email authorization not found for account: {email_account}")
- # 3. 调用邮件服务执行转发
- # 这里复用你之前写的 Service 方法
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db,
- auth=auth,
- forward_to=forward_to,
- sender=sender,
- recipient=recipient,
- subject_keywords=subject_keywords,
- body_keywords=body_keywords
- )
- # 4. (可选) 将结果回写到 Task Meta
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
|