| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- # app/services/task_handlers.py
- import logging
- from datetime import datetime
- from typing import Callable, Awaitable, Dict, Any
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from redis.asyncio import Redis
- from app.models.user import VasUser
- from app.models.vas_task import VasTask
- from app.models.order import VasOrder
- from app.services.notification_service import NotificationService
- from app.services.email_authorizations_service import EmailAuthorizationService
- logger = logging.getLogger(__name__)
- # 1. 修改 HandlerFunc 类型签名,增加 redis_client: Redis
- HandlerFunc = Callable[[AsyncSession, Redis, VasTask, VasOrder], Awaitable[None]]
- class TaskHandlerRegistry:
- """
- 任务后处理注册表
- 用于管理 routing_key 与处理函数的对应关系
- """
- def __init__(self):
- self._handlers: Dict[str, HandlerFunc] = {}
- def register(self, *routing_keys: str):
- def decorator(func: HandlerFunc):
- for key in routing_keys:
- if key in self._handlers:
- logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
- self._handlers[key] = func
- return func
- return decorator
- # 2. 修改 execute 方法,接收 redis_client
- async def execute(self, routing_key: str, db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
- """
- 根据 routing_key 查找并执行对应的处理函数
- """
- handler = self._handlers.get(routing_key)
-
- if not handler:
- logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
- return
- try:
- logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
-
- # 传递 redis_client 给具体处理函数
- await handler(db, redis_client, task, order)
-
- logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
-
- except Exception as e:
- logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
- # await self._record_error(db, task, str(e))
- async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
- try:
- meta = dict(task.meta) if task.meta else {}
- meta["post_process_error"] = error_msg
- task.meta = meta
- except Exception:
- pass
- task_processor = TaskHandlerRegistry()
- # =================================================================
- # 具体的处理函数 (Handlers)
- # =================================================================
- # 辅助函数:获取用户信息(用于发通知)
- async def _get_user_by_id(db: AsyncSession, user_id: int) -> VasUser:
- stmt = select(VasUser).where(VasUser.id == user_id)
- result = await db.execute(stmt)
- return result.scalar_one_or_none()
- @task_processor.register("auto.slot.dub.fr.tourist")
- async def forward_troov_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
- """
- Troov (法国) 预约信转发及通知
- """
- # 1. 解析参数
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
- task_config = task.config or {}
- email_account = "visafly666@gmail.com"
-
- # 获取用户以发送通知
- user = await _get_user_by_id(db, order.user_id)
- forward_to = inputs.get("email") or (user.email if user else "")
-
- recipient = task_config.get('alias_email', "")
- first_name = inputs.get('first_name', '').capitalize()
- last_name = inputs.get('last_name', '').capitalize()
-
- # 处理日期格式用于关键词匹配
- book_date_str = grabbed_history.get("book_date", "")
- formatted_date_time = ""
- if book_date_str:
- try:
- # 假设 input 是 ISO 格式 "2024-03-15T09:00:00"
- dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
- formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M") # Troov 邮件中的特殊格式
-
- # 为邮件通知准备通用格式
- notification_date_str = dt.strftime("%d %b %Y, %H:%M")
- except ValueError:
- formatted_date_time = book_date_str
- notification_date_str = book_date_str
- # 过滤条件
- sender = "ne-pas-repondre at consulat.gouv.fr" # 注意实际匹配时可能需要处理 @ 符号
- subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
- body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing inputs for email forwarding.")
- return
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
- # 2. 获取授权并转发邮件
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email authorization not found: {email_account}")
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
- subject_keywords=subject_keywords, body_keywords=body_keywords
- )
- # 3. 更新 Task Meta
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
-
- # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis)
- if user and user.email:
- logger.info(f"📧 Sending appointment confirmation email to {user.email}")
- await NotificationService.post_email(
- redis_client=redis_client,
- receiver=user.email,
- template_id="appointment_confirmation", # 对应之前定义的模板ID
- payload={
- "username": user.nickname or first_name,
- "order_id": str(order.id),
- "country": "France",
- "city": "Dublin", # 根据 routing_key 或 inputs 推断
- "appointment_date": formatted_date_time,
- "visa_type": "Short Stay Any Purpose",
- "user_email": forward_to
- }
- )
- @task_processor.register("auto.slot.dub.nl.tourist")
- async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
- """
- VFS (荷兰) 预约信转发及通知
- """
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
- task_config = task.config or {}
- email_account = "visafly666@gmail.com"
- user = await _get_user_by_id(db, order.user_id)
- forward_to = inputs.get("email") or (user.email if user else "")
- urn = grabbed_history.get("urn", "")
-
- sender = "donotreply at vfsglobal.com"
- recipient = task_config.get('alias_email', "")
- subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
- body_keywords = inputs.get("bodyKeywords", f"urn") # VFS 通常用 URN 匹配
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing inputs.")
- return
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email auth not found: {email_account}")
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
- subject_keywords=subject_keywords, body_keywords=body_keywords
- )
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
- # 发送通知
- if user and user.email:
- await NotificationService.post_email(
- redis_client=redis_client,
- receiver=user.email,
- template_id="appointment_confirmation",
- payload={
- "username": user.nickname or 'consumer',
- "order_id": str(order.id),
- "country": "Netherlands",
- "city": "Dublin",
- "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
- "visa_type": "Tourist",
- "user_email": forward_to
- }
- )
- @task_processor.register("auto.slot.dub.de.tourist")
- async def forward_visametric_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
- """
- Visametric (德国) 预约信转发及通知
- """
- inputs: Dict[str, Any] = task.user_inputs or {}
- grabbed_history = task.grabbed_history or {}
- task_config = task.config or {}
- email_account = "visafly666@gmail.com"
- user = await _get_user_by_id(db, order.user_id)
- forward_to = inputs.get("email") or (user.email if user else "")
-
- first_name = inputs.get('first_name', '')
- last_name = inputs.get('last_name', '')
- pnr_number = grabbed_history.get('pnr_number', '')
- slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
- slot_time = grabbed_history.get('slot_time', '')
-
- sender = "noreply at visametric.com"
- recipient = task_config.get("alias_email", "")
- subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
- body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
- if not email_account or not forward_to:
- logger.warning(f"Task {task.id} missing inputs.")
- return
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email auth not found: {email_account}")
- result = await EmailAuthorizationService.forward_first_matching_email2(
- db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
- subject_keywords=subject_keywords, body_keywords=body_keywords
- )
- meta = dict(task.meta) if task.meta else {}
- meta["forward_result"] = result
- task.meta = meta
- # 发送通知
- if user and user.email:
- display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
-
- await NotificationService.post_email(
- redis_client=redis_client,
- receiver=user.email,
- template_id="appointment_confirmation",
- payload={
- "username": user.nickname or first_name,
- "order_id": str(order.id),
- "country": "Germany",
- "city": "Dublin",
- "appointment_date": display_date,
- "visa_type": "Tourist",
- "user_email": forward_to
- }
- )
|