|
@@ -5,17 +5,18 @@ from datetime import datetime
|
|
|
from typing import Callable, Awaitable, Dict, Any
|
|
from typing import Callable, Awaitable, Dict, Any
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy import select
|
|
|
|
|
+from redis.asyncio import Redis
|
|
|
|
|
+
|
|
|
from app.models.user import VasUser
|
|
from app.models.user import VasUser
|
|
|
from app.models.vas_task import VasTask
|
|
from app.models.vas_task import VasTask
|
|
|
from app.models.order import VasOrder
|
|
from app.models.order import VasOrder
|
|
|
-
|
|
|
|
|
-# 导入具体的业务服务 (用于发邮件等)
|
|
|
|
|
|
|
+from app.services.notification_service import NotificationService
|
|
|
from app.services.email_authorizations_service import EmailAuthorizationService
|
|
from app.services.email_authorizations_service import EmailAuthorizationService
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
-# 定义处理器函数的类型签名
|
|
|
|
|
-HandlerFunc = Callable[[AsyncSession, VasTask, VasOrder], Awaitable[None]]
|
|
|
|
|
|
|
+# 1. 修改 HandlerFunc 类型签名,增加 redis_client: Redis
|
|
|
|
|
+HandlerFunc = Callable[[AsyncSession, Redis, VasTask, VasOrder], Awaitable[None]]
|
|
|
|
|
|
|
|
class TaskHandlerRegistry:
|
|
class TaskHandlerRegistry:
|
|
|
"""
|
|
"""
|
|
@@ -26,14 +27,6 @@ class TaskHandlerRegistry:
|
|
|
self._handlers: Dict[str, HandlerFunc] = {}
|
|
self._handlers: Dict[str, HandlerFunc] = {}
|
|
|
|
|
|
|
|
def register(self, *routing_keys: str):
|
|
def register(self, *routing_keys: str):
|
|
|
- """
|
|
|
|
|
- 装饰器:将一个或多个 routing_key 注册到同一个处理函数
|
|
|
|
|
-
|
|
|
|
|
- 用法:
|
|
|
|
|
- @task_processor.register("key1", "key2")
|
|
|
|
|
- async def my_handler(...):
|
|
|
|
|
- pass
|
|
|
|
|
- """
|
|
|
|
|
def decorator(func: HandlerFunc):
|
|
def decorator(func: HandlerFunc):
|
|
|
for key in routing_keys:
|
|
for key in routing_keys:
|
|
|
if key in self._handlers:
|
|
if key in self._handlers:
|
|
@@ -42,47 +35,38 @@ class TaskHandlerRegistry:
|
|
|
return func
|
|
return func
|
|
|
return decorator
|
|
return decorator
|
|
|
|
|
|
|
|
- async def execute(self, routing_key: str, db: AsyncSession, task: VasTask, order: VasOrder):
|
|
|
|
|
|
|
+ # 2. 修改 execute 方法,接收 redis_client
|
|
|
|
|
+ async def execute(self, routing_key: str, db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
|
|
|
"""
|
|
"""
|
|
|
根据 routing_key 查找并执行对应的处理函数
|
|
根据 routing_key 查找并执行对应的处理函数
|
|
|
"""
|
|
"""
|
|
|
handler = self._handlers.get(routing_key)
|
|
handler = self._handlers.get(routing_key)
|
|
|
|
|
|
|
|
if not handler:
|
|
if not handler:
|
|
|
- # 如果没有注册处理器,通常直接跳过即可
|
|
|
|
|
logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
|
|
logger.info(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
|
|
logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
|
|
|
|
|
|
|
|
- # 执行具体的业务逻辑
|
|
|
|
|
- await handler(db, task, order)
|
|
|
|
|
|
|
+ # 传递 redis_client 给具体处理函数
|
|
|
|
|
+ await handler(db, redis_client, task, order)
|
|
|
|
|
|
|
|
logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
|
|
logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- # 捕获异常,防止阻断主流程 (manual_confirm) 的事务提交
|
|
|
|
|
- # 也可以选择在这里将错误写入 task.meta
|
|
|
|
|
logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
|
|
logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
|
|
|
-
|
|
|
|
|
- # 可选:记录错误到 Task Meta
|
|
|
|
|
# await self._record_error(db, task, str(e))
|
|
# await self._record_error(db, task, str(e))
|
|
|
|
|
|
|
|
async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
|
|
async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
|
|
|
- """辅助方法:将错误信息回写到 Task (可选)"""
|
|
|
|
|
try:
|
|
try:
|
|
|
meta = dict(task.meta) if task.meta else {}
|
|
meta = dict(task.meta) if task.meta else {}
|
|
|
meta["post_process_error"] = error_msg
|
|
meta["post_process_error"] = error_msg
|
|
|
task.meta = meta
|
|
task.meta = meta
|
|
|
- # 注意:这里不需要 commit,因为外层 manual_confirm 会统一 commit
|
|
|
|
|
except Exception:
|
|
except Exception:
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
-# =================================================================
|
|
|
|
|
-# 实例化注册表 (在其他地方 import 这个实例)
|
|
|
|
|
-# =================================================================
|
|
|
|
|
task_processor = TaskHandlerRegistry()
|
|
task_processor = TaskHandlerRegistry()
|
|
|
|
|
|
|
|
|
|
|
|
@@ -90,197 +74,203 @@ task_processor = TaskHandlerRegistry()
|
|
|
# 具体的处理函数 (Handlers)
|
|
# 具体的处理函数 (Handlers)
|
|
|
# =================================================================
|
|
# =================================================================
|
|
|
|
|
|
|
|
|
|
+# 辅助函数:获取用户信息(用于发通知)
|
|
|
|
|
+async def _get_user_by_id(db: AsyncSession, user_id: int) -> VasUser:
|
|
|
|
|
+ stmt = select(VasUser).where(VasUser.id == user_id)
|
|
|
|
|
+ result = await db.execute(stmt)
|
|
|
|
|
+ return result.scalar_one_or_none()
|
|
|
|
|
+
|
|
|
@task_processor.register("auto.slot.dub.fr.tourist")
|
|
@task_processor.register("auto.slot.dub.fr.tourist")
|
|
|
-async def forward_troov_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
|
|
|
|
|
|
|
+async def forward_troov_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
|
|
|
"""
|
|
"""
|
|
|
- 处理邮件转发相关的任务
|
|
|
|
|
- 支持 routing_key: email.forward, email.auto_reply
|
|
|
|
|
|
|
+ Troov (法国) 预约信转发及通知
|
|
|
"""
|
|
"""
|
|
|
# 1. 解析参数
|
|
# 1. 解析参数
|
|
|
- # 假设参数存储在 task.user_inputs 或 task.config 中
|
|
|
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
|
grabbed_history = task.grabbed_history or {}
|
|
grabbed_history = task.grabbed_history or {}
|
|
|
-
|
|
|
|
|
task_config = task.config or {}
|
|
task_config = task.config or {}
|
|
|
|
|
|
|
|
-
|
|
|
|
|
email_account = "visafly666@gmail.com"
|
|
email_account = "visafly666@gmail.com"
|
|
|
- forward_to = inputs.get("email", "")
|
|
|
|
|
- if not forward_to:
|
|
|
|
|
- uid = order.user_id
|
|
|
|
|
- stmt = select(VasUser).where(VasUser.id == uid)
|
|
|
|
|
- result = await db.execute(stmt)
|
|
|
|
|
- user = result.scalar_one_or_none()
|
|
|
|
|
- forward_to = user.email
|
|
|
|
|
-
|
|
|
|
|
- recipient = task_config.get('alias_email', "")
|
|
|
|
|
|
|
|
|
|
- first_name = inputs.get('first_name', '')
|
|
|
|
|
- last_name = inputs.get('last_name', '')
|
|
|
|
|
|
|
+ # 获取用户以发送通知
|
|
|
|
|
+ user = await _get_user_by_id(db, order.user_id)
|
|
|
|
|
+ forward_to = inputs.get("email") or (user.email if user else "")
|
|
|
|
|
|
|
|
- first_name = first_name.capitalize()
|
|
|
|
|
- last_name = last_name.capitalize()
|
|
|
|
|
|
|
+ recipient = task_config.get('alias_email', "")
|
|
|
|
|
+ first_name = inputs.get('first_name', '').capitalize()
|
|
|
|
|
+ last_name = inputs.get('last_name', '').capitalize()
|
|
|
|
|
|
|
|
|
|
+ # 处理日期格式用于关键词匹配
|
|
|
book_date_str = grabbed_history.get("book_date", "")
|
|
book_date_str = grabbed_history.get("book_date", "")
|
|
|
- dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
|
|
|
|
|
- formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M")
|
|
|
|
|
-
|
|
|
|
|
|
|
+ formatted_date_time = ""
|
|
|
|
|
+ if book_date_str:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 假设 input 是 ISO 格式 "2024-03-15T09:00:00"
|
|
|
|
|
+ dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
|
|
|
|
|
+ formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M") # Troov 邮件中的特殊格式
|
|
|
|
|
+
|
|
|
|
|
+ # 为邮件通知准备通用格式
|
|
|
|
|
+ notification_date_str = dt.strftime("%d %b %Y, %H:%M")
|
|
|
|
|
+ except ValueError:
|
|
|
|
|
+ formatted_date_time = book_date_str
|
|
|
|
|
+ notification_date_str = book_date_str
|
|
|
|
|
+
|
|
|
# 过滤条件
|
|
# 过滤条件
|
|
|
- sender = "ne-pas-repondre at consulat.gouv.fr"
|
|
|
|
|
-
|
|
|
|
|
|
|
+ sender = "ne-pas-repondre at consulat.gouv.fr" # 注意实际匹配时可能需要处理 @ 符号
|
|
|
subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
|
|
subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
|
|
|
body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
|
|
body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
|
|
|
|
|
|
|
|
if not email_account or not forward_to:
|
|
if not email_account or not forward_to:
|
|
|
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
|
|
|
|
|
|
|
+ logger.warning(f"Task {task.id} missing inputs for email forwarding.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
|
|
logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
|
|
|
|
|
|
|
|
- # 2. 获取授权配置
|
|
|
|
|
|
|
+ # 2. 获取授权并转发邮件
|
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
|
if not auth:
|
|
if not auth:
|
|
|
- raise ValueError(f"Email authorization not found for account: {email_account}")
|
|
|
|
|
|
|
+ raise ValueError(f"Email authorization not found: {email_account}")
|
|
|
|
|
|
|
|
- # 3. 调用邮件服务执行转发
|
|
|
|
|
- # 这里复用你之前写的 Service 方法
|
|
|
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
|
- db=db,
|
|
|
|
|
- auth=auth,
|
|
|
|
|
- forward_to=forward_to,
|
|
|
|
|
- sender=sender,
|
|
|
|
|
- recipient=recipient,
|
|
|
|
|
- subject_keywords=subject_keywords,
|
|
|
|
|
- body_keywords=body_keywords
|
|
|
|
|
|
|
+ db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
|
|
|
|
|
+ subject_keywords=subject_keywords, body_keywords=body_keywords
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 4. (可选) 将结果回写到 Task Meta
|
|
|
|
|
|
|
+ # 3. 更新 Task Meta
|
|
|
meta = dict(task.meta) if task.meta else {}
|
|
meta = dict(task.meta) if task.meta else {}
|
|
|
meta["forward_result"] = result
|
|
meta["forward_result"] = result
|
|
|
task.meta = meta
|
|
task.meta = meta
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
+ # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis)
|
|
|
|
|
+ if user and user.email:
|
|
|
|
|
+ logger.info(f"📧 Sending appointment confirmation email to {user.email}")
|
|
|
|
|
+ await NotificationService.post_email(
|
|
|
|
|
+ redis_client=redis_client,
|
|
|
|
|
+ receiver=user.email,
|
|
|
|
|
+ template_id="appointment_confirmation", # 对应之前定义的模板ID
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "username": user.nickname or first_name,
|
|
|
|
|
+ "order_id": str(order.id),
|
|
|
|
|
+ "country": "France",
|
|
|
|
|
+ "city": "Dublin", # 根据 routing_key 或 inputs 推断
|
|
|
|
|
+ "appointment_date": formatted_date_time,
|
|
|
|
|
+ "visa_type": "Short Stay Any Purpose",
|
|
|
|
|
+ "user_email": forward_to
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@task_processor.register("auto.slot.dub.nl.tourist")
|
|
@task_processor.register("auto.slot.dub.nl.tourist")
|
|
|
-async def forward_vfs_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
|
|
|
|
|
|
|
+async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
|
|
|
"""
|
|
"""
|
|
|
- 处理邮件转发相关的任务
|
|
|
|
|
- 支持 routing_key: email.forward, email.auto_reply
|
|
|
|
|
|
|
+ VFS (荷兰) 预约信转发及通知
|
|
|
"""
|
|
"""
|
|
|
- # 1. 解析参数
|
|
|
|
|
- # 假设参数存储在 task.user_inputs 或 task.config 中
|
|
|
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
|
grabbed_history = task.grabbed_history or {}
|
|
grabbed_history = task.grabbed_history or {}
|
|
|
-
|
|
|
|
|
task_config = task.config or {}
|
|
task_config = task.config or {}
|
|
|
|
|
|
|
|
-
|
|
|
|
|
email_account = "visafly666@gmail.com"
|
|
email_account = "visafly666@gmail.com"
|
|
|
- forward_to = inputs.get("email", "")
|
|
|
|
|
- if not forward_to:
|
|
|
|
|
- uid = order.user_id
|
|
|
|
|
- stmt = select(VasUser).where(VasUser.id == uid)
|
|
|
|
|
- result = await db.execute(stmt)
|
|
|
|
|
- user = result.scalar_one_or_none()
|
|
|
|
|
- forward_to = user.email
|
|
|
|
|
-
|
|
|
|
|
|
|
+ user = await _get_user_by_id(db, order.user_id)
|
|
|
|
|
+ forward_to = inputs.get("email") or (user.email if user else "")
|
|
|
|
|
|
|
|
-
|
|
|
|
|
urn = grabbed_history.get("urn", "")
|
|
urn = grabbed_history.get("urn", "")
|
|
|
|
|
|
|
|
- # 过滤条件
|
|
|
|
|
sender = "donotreply at vfsglobal.com"
|
|
sender = "donotreply at vfsglobal.com"
|
|
|
recipient = task_config.get('alias_email', "")
|
|
recipient = task_config.get('alias_email', "")
|
|
|
subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
|
|
subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
|
|
|
- body_keywords = inputs.get("bodyKeywords", f"urn")
|
|
|
|
|
|
|
+ body_keywords = inputs.get("bodyKeywords", f"urn") # VFS 通常用 URN 匹配
|
|
|
|
|
|
|
|
if not email_account or not forward_to:
|
|
if not email_account or not forward_to:
|
|
|
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
|
|
|
|
|
|
|
+ logger.warning(f"Task {task.id} missing inputs.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
|
|
|
|
|
-
|
|
|
|
|
- # 2. 获取授权配置
|
|
|
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
|
if not auth:
|
|
if not auth:
|
|
|
- raise ValueError(f"Email authorization not found for account: {email_account}")
|
|
|
|
|
|
|
+ raise ValueError(f"Email auth not found: {email_account}")
|
|
|
|
|
|
|
|
- # 3. 调用邮件服务执行转发
|
|
|
|
|
- # 这里复用你之前写的 Service 方法
|
|
|
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
|
- db=db,
|
|
|
|
|
- auth=auth,
|
|
|
|
|
- forward_to=forward_to,
|
|
|
|
|
- sender=sender,
|
|
|
|
|
- recipient=recipient,
|
|
|
|
|
- subject_keywords=subject_keywords,
|
|
|
|
|
- body_keywords=body_keywords
|
|
|
|
|
|
|
+ db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
|
|
|
|
|
+ subject_keywords=subject_keywords, body_keywords=body_keywords
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 4. (可选) 将结果回写到 Task Meta
|
|
|
|
|
meta = dict(task.meta) if task.meta else {}
|
|
meta = dict(task.meta) if task.meta else {}
|
|
|
meta["forward_result"] = result
|
|
meta["forward_result"] = result
|
|
|
task.meta = meta
|
|
task.meta = meta
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ # 发送通知
|
|
|
|
|
+ if user and user.email:
|
|
|
|
|
+ await NotificationService.post_email(
|
|
|
|
|
+ redis_client=redis_client,
|
|
|
|
|
+ receiver=user.email,
|
|
|
|
|
+ template_id="appointment_confirmation",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "username": user.nickname or 'consumer',
|
|
|
|
|
+ "order_id": str(order.id),
|
|
|
|
|
+ "country": "Netherlands",
|
|
|
|
|
+ "city": "Dublin",
|
|
|
|
|
+ "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
|
|
|
|
|
+ "visa_type": "Tourist",
|
|
|
|
|
+ "user_email": forward_to
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@task_processor.register("auto.slot.dub.de.tourist")
|
|
@task_processor.register("auto.slot.dub.de.tourist")
|
|
|
-async def forward_visametric_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
|
|
|
|
|
|
|
+async def forward_visametric_appointment_letter(db: AsyncSession, redis_client: Redis, task: VasTask, order: VasOrder):
|
|
|
"""
|
|
"""
|
|
|
- 处理邮件转发相关的任务
|
|
|
|
|
- 支持 routing_key: email.forward, email.auto_reply
|
|
|
|
|
|
|
+ Visametric (德国) 预约信转发及通知
|
|
|
"""
|
|
"""
|
|
|
- # 1. 解析参数
|
|
|
|
|
- # 假设参数存储在 task.user_inputs 或 task.config 中
|
|
|
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
inputs: Dict[str, Any] = task.user_inputs or {}
|
|
|
grabbed_history = task.grabbed_history or {}
|
|
grabbed_history = task.grabbed_history or {}
|
|
|
-
|
|
|
|
|
task_config = task.config or {}
|
|
task_config = task.config or {}
|
|
|
|
|
|
|
|
-
|
|
|
|
|
email_account = "visafly666@gmail.com"
|
|
email_account = "visafly666@gmail.com"
|
|
|
- forward_to = inputs.get("email", "")
|
|
|
|
|
- if not forward_to:
|
|
|
|
|
- uid = order.user_id
|
|
|
|
|
- stmt = select(VasUser).where(VasUser.id == uid)
|
|
|
|
|
- result = await db.execute(stmt)
|
|
|
|
|
- user = result.scalar_one_or_none()
|
|
|
|
|
- forward_to = user.email
|
|
|
|
|
|
|
+ user = await _get_user_by_id(db, order.user_id)
|
|
|
|
|
+ forward_to = inputs.get("email") or (user.email if user else "")
|
|
|
|
|
|
|
|
first_name = inputs.get('first_name', '')
|
|
first_name = inputs.get('first_name', '')
|
|
|
last_name = inputs.get('last_name', '')
|
|
last_name = inputs.get('last_name', '')
|
|
|
-
|
|
|
|
|
pnr_number = grabbed_history.get('pnr_number', '')
|
|
pnr_number = grabbed_history.get('pnr_number', '')
|
|
|
- slot_date = grabbed_history.get('slot_date', '')
|
|
|
|
|
|
|
+ slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
|
|
|
slot_time = grabbed_history.get('slot_time', '')
|
|
slot_time = grabbed_history.get('slot_time', '')
|
|
|
|
|
|
|
|
- # 过滤条件
|
|
|
|
|
sender = "noreply at visametric.com"
|
|
sender = "noreply at visametric.com"
|
|
|
recipient = task_config.get("alias_email", "")
|
|
recipient = task_config.get("alias_email", "")
|
|
|
subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
|
|
subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
|
|
|
body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
|
|
body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
|
|
|
|
|
|
|
|
if not email_account or not forward_to:
|
|
if not email_account or not forward_to:
|
|
|
- logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
|
|
|
|
|
|
|
+ logger.warning(f"Task {task.id} missing inputs.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
|
|
|
|
|
-
|
|
|
|
|
- # 2. 获取授权配置
|
|
|
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
auth = await EmailAuthorizationService.get_by_email(db, email_account)
|
|
|
if not auth:
|
|
if not auth:
|
|
|
- raise ValueError(f"Email authorization not found for account: {email_account}")
|
|
|
|
|
|
|
+ raise ValueError(f"Email auth not found: {email_account}")
|
|
|
|
|
|
|
|
- # 3. 调用邮件服务执行转发
|
|
|
|
|
- # 这里复用你之前写的 Service 方法
|
|
|
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
result = await EmailAuthorizationService.forward_first_matching_email2(
|
|
|
- db=db,
|
|
|
|
|
- auth=auth,
|
|
|
|
|
- forward_to=forward_to,
|
|
|
|
|
- sender=sender,
|
|
|
|
|
- recipient=recipient,
|
|
|
|
|
- subject_keywords=subject_keywords,
|
|
|
|
|
- body_keywords=body_keywords
|
|
|
|
|
|
|
+ db=db, auth=auth, forward_to=forward_to, sender=sender, recipient=recipient,
|
|
|
|
|
+ subject_keywords=subject_keywords, body_keywords=body_keywords
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 4. (可选) 将结果回写到 Task Meta
|
|
|
|
|
meta = dict(task.meta) if task.meta else {}
|
|
meta = dict(task.meta) if task.meta else {}
|
|
|
meta["forward_result"] = result
|
|
meta["forward_result"] = result
|
|
|
- task.meta = meta
|
|
|
|
|
|
|
+ task.meta = meta
|
|
|
|
|
+
|
|
|
|
|
+ # 发送通知
|
|
|
|
|
+ if user and user.email:
|
|
|
|
|
+ display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
|
|
|
|
|
+
|
|
|
|
|
+ await NotificationService.post_email(
|
|
|
|
|
+ redis_client=redis_client,
|
|
|
|
|
+ receiver=user.email,
|
|
|
|
|
+ template_id="appointment_confirmation",
|
|
|
|
|
+ payload={
|
|
|
|
|
+ "username": user.nickname or first_name,
|
|
|
|
|
+ "order_id": str(order.id),
|
|
|
|
|
+ "country": "Germany",
|
|
|
|
|
+ "city": "Dublin",
|
|
|
|
|
+ "appointment_date": display_date,
|
|
|
|
|
+ "visa_type": "Tourist",
|
|
|
|
|
+ "user_email": forward_to
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|