jerry 4 miesięcy temu
rodzic
commit
8527ab28ea

+ 1 - 0
app/api/router.py

@@ -306,6 +306,7 @@ async def email_authorizations_forward_email2(
 ):
     auth = await EmailAuthorizationService.get_by_email(db, emailAccount)
     result = await EmailAuthorizationService.forward_first_matching_email2(
+        db,
         auth,
         forward_to = forwardTo,
         sender = sender,

+ 32 - 25
app/services/email_authorizations_service.py

@@ -14,7 +14,7 @@ from typing import List, Optional
 
 from sqlalchemy.orm import Session
 from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy import select 
+from sqlalchemy import select, text
 from starlette.concurrency import run_in_threadpool
 
 from app.core.logger import logger
@@ -593,40 +593,39 @@ class EmailAuthorizationService:
         # 4. 处理主题关键词 (OR 关系)
         subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
         if subj_keys:
-            or_clauses = []
             for i, k in enumerate(subj_keys):
                 key_name = f"subj_{i}"
-                or_clauses.append(f"subject LIKE :{key_name}")
+                # 直接拼接到主 SQL 中,要求同时满足
+                sql += f" AND subject LIKE :{key_name}"
                 params[key_name] = f"%{k}%"
-            sql += f" AND ({' OR '.join(or_clauses)})"
 
         # 5. 处理内容关键词 (OR 关系)
         body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
         if body_keys:
-            or_clauses = []
             for i, k in enumerate(body_keys):
                 key_name = f"body_{i}"
-                or_clauses.append(f"body_text LIKE :{key_name}")
+                # 直接拼接到主 SQL 中,要求同时满足
+                sql += f" AND body_text LIKE :{key_name}"
                 params[key_name] = f"%{k}%"
-            sql += f" AND ({' OR '.join(or_clauses)})"
 
         # 6. 获取最新的一条
         sql += " ORDER BY uid DESC LIMIT 1"
 
         try:
             # 执行查询
-            result = db.execute(text(sql), params).fetchone()
+            result_proxy = await db.execute(text(sql), params)
+            result = result_proxy.fetchone()
             
             if not result:
-                logging.info(f"DB Search: No email found for {sender} -> {recipient}")
+                logger.info(f"DB Search: No email found for {sender} -> {recipient}")
                 return None
                 
             target_uid = result.uid
             target_subject = result.subject
-            logging.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
+            logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
             
         except Exception as e:
-            logging.error(f"DB Search Error: {e}")
+            logger.error(f"DB Search Error: {e}")
             return f"数据库查询失败: {str(e)}"
 
         # =========================================================
@@ -648,28 +647,36 @@ class EmailAuthorizationService:
                 # 2. 根据 UID 精准拉取 (使用 fetch)
                 # 注意:IMAPClient 的 fetch 方法
                 # UID 必须转为 int 或者 sequence set 字符串
-                res = mail.fetch([target_uid], ["RFC822"])
+                res, data = mail.uid('fetch', str(target_uid), '(RFC822)')
                 
-                if not res or target_uid not in res:
-                    return f"数据库中存在 UID {target_uid},但在邮箱服务器上未找到 (可能已被删除)"
+                # 🔴 修正点:不要写 if target_uid in res
+                # res 是状态字符串 "OK",data 是包含邮件内容的列表
+                if res != 'OK':
+                    return f"IMAP Fetch 失败,状态码: {res}"
                 
-                msg_data = res[target_uid][b"RFC822"]
-                msg = email.message_from_bytes(msg_data)
+                if not data or not data[0]:
+                    return f"未找到 UID {target_uid} 的邮件内容 (可能已被物理删除)"
+
+                # data[0] 通常是 tuple (byte_header, byte_content),但也可能是 None
+                if isinstance(data[0], tuple):
+                    raw_email_bytes = data[0][1]
+                else:
+                    # 如果 data[0] 只是 bytes (例如 b')'),说明没拿到邮件体
+                    return f"邮件数据格式异常,无法解析: {str(data)}"
+
+                msg = email.message_from_bytes(raw_email_bytes)
 
                 # 3. 处理转发逻辑
-                # 解析原标题 (为了做转发前缀)
-                subject_raw = msg.get("Subject", "")
-                subject = target_subject # 优先用数据库查出来的,或者重新解析也可
-                
+                # 使用数据库中查到的标题,确保标题准确
+                subject = target_subject 
+
                 # 修改 Header 进行转发
-                # 警告:这种方式会破坏原邮件的 DKIM 签名,导致进入垃圾箱概率增加
-                # 但为了保持你原有的逻辑,这里继续使用 Header 修改法
                 del msg['From']
                 del msg['To']
-                del msg['Cc'] # 转发通常去掉抄送
+                del msg['Cc']
                 del msg['Subject']
                 
-                msg['From'] = auth.email  # 发件人必须是当前授权账号
+                msg['From'] = auth.email  # 发件人覆写为当前授权账号
                 msg['To'] = forward_to
                 msg['Subject'] = f"FWD: {subject}"
                 
@@ -679,7 +686,7 @@ class EmailAuthorizationService:
                 return f"邮件 '{subject}' (UID: {target_uid}) 已成功转发至: {forward_to}"
                 
             except Exception as e:
-                logging.error(f"IMAP Forward Error: {e}")
+                logger.error(f"IMAP Forward Error: {e}")
                 return f"邮件转发过程出错: {str(e)}"
             finally:
                 if mail:

+ 286 - 0
app/services/task_handlers.py

@@ -0,0 +1,286 @@
+# app/services/task_handlers.py
+
+import logging
+from datetime import datetime
+from typing import Callable, Awaitable, Dict, Any
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from app.models.user import VasUser
+from app.models.vas_task import VasTask
+from app.models.order import VasOrder
+
+# 导入具体的业务服务 (用于发邮件等)
+from app.services.email_authorizations_service import EmailAuthorizationService
+
+logger = logging.getLogger(__name__)
+
+# 定义处理器函数的类型签名
+HandlerFunc = Callable[[AsyncSession, VasTask, VasOrder], Awaitable[None]]
+
+class TaskHandlerRegistry:
+    """
+    任务后处理注册表
+    用于管理 routing_key 与处理函数的对应关系
+    """
+    def __init__(self):
+        self._handlers: Dict[str, HandlerFunc] = {}
+
+    def register(self, *routing_keys: str):
+        """
+        装饰器:将一个或多个 routing_key 注册到同一个处理函数
+        
+        用法:
+        @task_processor.register("key1", "key2")
+        async def my_handler(...):
+            pass
+        """
+        def decorator(func: HandlerFunc):
+            for key in routing_keys:
+                if key in self._handlers:
+                    logger.warning(f"⚠️ [TaskHandler] Overwriting handler for key: {key}")
+                self._handlers[key] = func
+            return func
+        return decorator
+
+    async def execute(self, routing_key: str, db: AsyncSession, task: VasTask, order: VasOrder):
+        """
+        根据 routing_key 查找并执行对应的处理函数
+        """
+        handler = self._handlers.get(routing_key)
+        
+        if not handler:
+            # 如果没有注册处理器,通常直接跳过即可
+            logger.debug(f"ℹ️ [TaskHandler] No handler found for routing_key: {routing_key}, skipping.")
+            return
+
+        try:
+            logger.info(f"▶️ [TaskHandler] Executing handler '{handler.__name__}' for task {task.id} (key: {routing_key})")
+            
+            # 执行具体的业务逻辑
+            await handler(db, task, order)
+            
+            logger.info(f"✅ [TaskHandler] Handler '{handler.__name__}' completed successfully.")
+            
+        except Exception as e:
+            # 捕获异常,防止阻断主流程 (manual_confirm) 的事务提交
+            # 也可以选择在这里将错误写入 task.meta
+            logger.error(f"❌ [TaskHandler] Error executing handler for task {task.id}: {str(e)}", exc_info=True)
+            
+            # 可选:记录错误到 Task Meta
+            # await self._record_error(db, task, str(e))
+
+    async def _record_error(self, db: AsyncSession, task: VasTask, error_msg: str):
+        """辅助方法:将错误信息回写到 Task (可选)"""
+        try:
+            meta = dict(task.meta) if task.meta else {}
+            meta["post_process_error"] = error_msg
+            task.meta = meta
+            # 注意:这里不需要 commit,因为外层 manual_confirm 会统一 commit
+        except Exception:
+            pass
+
+
+# =================================================================
+# 实例化注册表 (在其他地方 import 这个实例)
+# =================================================================
+task_processor = TaskHandlerRegistry()
+
+
+# =================================================================
+# 具体的处理函数 (Handlers)
+# =================================================================
+
+@task_processor.register("auto.slot.dub.fr.tourist")
+async def forward_troov_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
+    """
+    处理邮件转发相关的任务
+    支持 routing_key: email.forward, email.auto_reply
+    """
+    # 1. 解析参数
+    # 假设参数存储在 task.user_inputs 或 task.config 中
+    inputs: Dict[str, Any] = task.user_inputs or {}
+    grabbed_history = task.grabbed_history or {}
+    
+    task_config = task.config or {}
+
+    
+    email_account = "visafly666@gmail.com"
+    forward_to = inputs.get("email", "")
+    if not forward_to:
+        uid = order.user_id
+        stmt = select(VasUser).where(VasUser.id == uid)
+        result = await db.execute(stmt)
+        user = result.scalar_one_or_none()
+        forward_to = user.email
+        
+    recipient = task_config.get('alias_email', "")
+    
+    first_name = inputs.get('first_name', '')
+    last_name = inputs.get('last_name', '')
+    
+    first_name = first_name.capitalize()
+    last_name = last_name.capitalize()
+    
+    book_date_str = grabbed_history.get("book_date", "")
+    dt = datetime.strptime(book_date_str, "%Y-%m-%dT%H:%M:%S")
+    formatted_date_time = dt.strftime("%B %-d, %Y to %Hh%M")
+    
+    # 过滤条件
+    sender = "ne-pas-repondre at consulat.gouv.fr"
+    
+    subject_keywords = inputs.get("subjectKeywords", "Confirmed,appointment,Section,Ambassade,France,Irlande")
+    body_keywords = inputs.get("bodyKeywords", f"Concerned,person,{first_name},{last_name},{formatted_date_time}")
+
+    if not email_account or not forward_to:
+        logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
+        return
+
+    logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
+
+    # 2. 获取授权配置
+    auth = await EmailAuthorizationService.get_by_email(db, email_account)
+    if not auth:
+        raise ValueError(f"Email authorization not found for account: {email_account}")
+
+    # 3. 调用邮件服务执行转发
+    # 这里复用你之前写的 Service 方法
+    result = await EmailAuthorizationService.forward_first_matching_email2(
+        db=db,
+        auth=auth,
+        forward_to=forward_to,
+        sender=sender,
+        recipient=recipient,
+        subject_keywords=subject_keywords,
+        body_keywords=body_keywords
+    )
+
+    # 4. (可选) 将结果回写到 Task Meta
+    meta = dict(task.meta) if task.meta else {}
+    meta["forward_result"] = result
+    task.meta = meta
+    
+    
+@task_processor.register("auto.slot.dub.nl.tourist")
+async def forward_vfs_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
+    """
+    处理邮件转发相关的任务
+    支持 routing_key: email.forward, email.auto_reply
+    """
+    # 1. 解析参数
+    # 假设参数存储在 task.user_inputs 或 task.config 中
+    inputs: Dict[str, Any] = task.user_inputs or {}
+    grabbed_history = task.grabbed_history or {}
+    
+    task_config = task.config or {}
+
+    
+    email_account = "visafly666@gmail.com"
+    forward_to = inputs.get("email", "")
+    if not forward_to:
+        uid = order.user_id
+        stmt = select(VasUser).where(VasUser.id == uid)
+        result = await db.execute(stmt)
+        user = result.scalar_one_or_none()
+        forward_to = user.email
+    
+
+    
+    urn = grabbed_history.get("urn", "")
+    
+    # 过滤条件
+    sender = "donotreply at vfsglobal.com"
+    recipient = task_config.get('alias_email', "")
+    subject_keywords = inputs.get("subjectKeywords", "Appointment,Confirm")
+    body_keywords = inputs.get("bodyKeywords", f"urn")
+
+    if not email_account or not forward_to:
+        logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
+        return
+
+    logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
+
+    # 2. 获取授权配置
+    auth = await EmailAuthorizationService.get_by_email(db, email_account)
+    if not auth:
+        raise ValueError(f"Email authorization not found for account: {email_account}")
+
+    # 3. 调用邮件服务执行转发
+    # 这里复用你之前写的 Service 方法
+    result = await EmailAuthorizationService.forward_first_matching_email2(
+        db=db,
+        auth=auth,
+        forward_to=forward_to,
+        sender=sender,
+        recipient=recipient,
+        subject_keywords=subject_keywords,
+        body_keywords=body_keywords
+    )
+
+    # 4. (可选) 将结果回写到 Task Meta
+    meta = dict(task.meta) if task.meta else {}
+    meta["forward_result"] = result
+    task.meta = meta
+    
+@task_processor.register("auto.slot.dub.de.tourist")
+async def forward_visametric_appointment_letter(db: AsyncSession, task: VasTask, order: VasOrder):
+    """
+    处理邮件转发相关的任务
+    支持 routing_key: email.forward, email.auto_reply
+    """
+    # 1. 解析参数
+    # 假设参数存储在 task.user_inputs 或 task.config 中
+    inputs: Dict[str, Any] = task.user_inputs or {}
+    grabbed_history = task.grabbed_history or {}
+    
+    task_config = task.config or {}
+
+    
+    email_account = "visafly666@gmail.com"
+    forward_to = inputs.get("email", "")
+    if not forward_to:
+        uid = order.user_id
+        stmt = select(VasUser).where(VasUser.id == uid)
+        result = await db.execute(stmt)
+        user = result.scalar_one_or_none()
+        forward_to = user.email
+    
+    first_name = inputs.get('first_name', '')
+    last_name = inputs.get('last_name', '')
+    
+    pnr_number = grabbed_history.get('pnr_number', '')
+    slot_date = grabbed_history.get('slot_date', '')
+    slot_time = grabbed_history.get('slot_time', '')
+    
+    # 过滤条件
+    sender = "noreply at visametric.com"
+    recipient = task_config.get("alias_email", "")
+    subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
+    body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
+
+    if not email_account or not forward_to:
+        logger.warning(f"Task {task.id} missing required inputs for email forwarding.")
+        return
+
+    logger.info(f"Task {task.id}: Forwarding emails from {email_account} to {forward_to}")
+
+    # 2. 获取授权配置
+    auth = await EmailAuthorizationService.get_by_email(db, email_account)
+    if not auth:
+        raise ValueError(f"Email authorization not found for account: {email_account}")
+
+    # 3. 调用邮件服务执行转发
+    # 这里复用你之前写的 Service 方法
+    result = await EmailAuthorizationService.forward_first_matching_email2(
+        db=db,
+        auth=auth,
+        forward_to=forward_to,
+        sender=sender,
+        recipient=recipient,
+        subject_keywords=subject_keywords,
+        body_keywords=body_keywords
+    )
+
+    # 4. (可选) 将结果回写到 Task Meta
+    meta = dict(task.meta) if task.meta else {}
+    meta["forward_result"] = result
+    task.meta = meta

+ 4 - 1
app/services/vas_task_service.py

@@ -14,6 +14,7 @@ from app.core.biz_exception import NotFoundError,BizLogicError
 from app.models.vas_task import VasTask
 from app.models.order import VasOrder
 from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate
+from app.services.task_handlers import task_processor
 
 
 class VasTaskService:
@@ -150,7 +151,9 @@ class VasTaskService:
             raise NotFoundError("Order not exist")
 
         order.status = "completed"
-
+        
+        await task_processor.execute(task.routing_key, db, task, order)
         await db.commit()
         await db.refresh(task)
         return task
+