jerry 4 달 전
부모
커밋
71f58a75a6
7개의 변경된 파일270개의 추가작업 그리고 44개의 파일을 삭제
  1. 1 1
      .env
  2. 23 2
      app/api/router.py
  3. 21 0
      app/models/emails.py
  4. 129 0
      app/services/email_authorizations_service.py
  5. 1 1
      app/services/llm_service.py
  6. 85 36
      app/services/seaweedfs_service.py
  7. 10 4
      docker-compose.yml

+ 1 - 1
.env

@@ -1,4 +1,4 @@
-ENV=DEV
+ENV=PROD
 DATABASE_URL=mysql+asyncmy://root:GqLLL7Bofj0WaaOpp.0@visafly.top:3306/book_user_info?charset=utf8mb4
 REDIS_URL=redis://:STEs2x6ML0U1HlpE9SojM6YU7QPhqzY8@45.137.220.138:6379/0
 OPENAI_API_KEY=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

+ 23 - 2
app/api/router.py

@@ -294,6 +294,27 @@ async def email_authorizations_forward_email(
     )
     return success(data={"body": result})
 
+@admin_required_router.post("/email-authorizations/forward2", summary="转发邮件(新)", tags=["邮箱接口"], response_model=ApiResponse[EmailContent])
+async def email_authorizations_forward_email2(
+    emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"),
+    forwardTo: str = Query(..., description="转发到哪个邮箱地址, 格式: xxx@xxx.xxx"),
+    sender: str = Query(..., description="发件人邮箱账号或者名字"),
+    recipient: str = Query(..., description="收件人账号或者名字"),
+    subjectKeywords: str = Query("", description="邮件主题关键词, 支持多个关键词, 用逗号隔开"),
+    bodyKeywords: str = Query("", description="邮件内容关键词, 支持多个关键词, 用逗号隔开"),
+    db: Session = Depends(get_db)
+):
+    auth = await EmailAuthorizationService.get_by_email(db, emailAccount)
+    result = await EmailAuthorizationService.forward_first_matching_email2(
+        auth,
+        forward_to = forwardTo,
+        sender = sender,
+        recipient = recipient,
+        subject_keywords = subjectKeywords,
+        body_keywords = bodyKeywords
+    )
+    return success(data={"body": result})
+
 @admin_required_router.post("/email-authorizations/sendmail", summary="发送邮件", tags=["邮箱接口"], response_model=ApiResponse[EmailContent])
 async def email_authorizations_send_email(
     emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"),
@@ -734,7 +755,7 @@ async def vas_order_create(
     schema = await SchemaService.get(db, product.schema_id)
     # ② 校验 user_inputs
     validate_user_inputs(
-        schema_json=schema.schema_content,
+        schema_json=schema.schema_json,
         user_inputs=payload.user_inputs,
     )
     created_order = await OrderService.create(db, payload, product, current_user, redis_client)
@@ -752,7 +773,7 @@ async def vas_order_create_by_admin(
     schema = await SchemaService.get(db, product.schema_id)
     # ② 校验 user_inputs
     validate_user_inputs(
-        schema_json=schema.schema_content,
+        schema_json=schema.schema_json,
         user_inputs=payload.user_inputs,
     )
     created_order = await OrderService.create_by_admin(db, payload, product, current_user, redis_client)

+ 21 - 0
app/models/emails.py

@@ -0,0 +1,21 @@
+from sqlalchemy import Column, BigInteger, String, Text, TIMESTAMP
+from sqlalchemy.sql import func
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class VasEmail(Base):
+    __tablename__ = "emails"
+
+    uid = Column(BigInteger, primary_key=True, comment="邮件UID")
+    subject = Column(String(1024), comment="邮件标题")
+    sender = Column(String(255), comment="发件人")
+    recipient = Column(String(500), comment="收件人(别名)")
+    receive_time = Column(String(100), comment="接收时间字符串")
+    body_text = Column(Text, comment="邮件正文")
+    created_at = Column(
+        TIMESTAMP,
+        server_default=func.current_timestamp(),
+        comment="入库时间"
+    )

+ 129 - 0
app/services/email_authorizations_service.py

@@ -560,6 +560,135 @@ class EmailAuthorizationService:
                 except: pass
 
         return await run_in_threadpool(_worker)
+    
+    @staticmethod
+    async def forward_first_matching_email2(
+            db: Session,
+            auth,
+            forward_to: str,
+            sender: str,
+            recipient: str,
+            subject_keywords: str,
+            body_keywords: str
+    ):
+        # =========================================================
+        # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
+        # =========================================================
+        
+        # 1. 构建动态 SQL
+        # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
+        sql = "SELECT uid, subject FROM emails WHERE 1=1"
+        params = {}
+
+        # 2. 处理发件人 (模糊匹配)
+        if sender.strip():
+            sql += " AND sender LIKE :sender"
+            params['sender'] = f"%{sender.strip()}%"
+
+        # 3. 处理收件人 (模糊匹配)
+        if recipient.strip():
+            sql += " AND recipient LIKE :recipient"
+            params['recipient'] = f"%{recipient.strip()}%"
+
+        # 4. 处理主题关键词 (OR 关系)
+        subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
+        if subj_keys:
+            or_clauses = []
+            for i, k in enumerate(subj_keys):
+                key_name = f"subj_{i}"
+                or_clauses.append(f"subject LIKE :{key_name}")
+                params[key_name] = f"%{k}%"
+            sql += f" AND ({' OR '.join(or_clauses)})"
+
+        # 5. 处理内容关键词 (OR 关系)
+        body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
+        if body_keys:
+            or_clauses = []
+            for i, k in enumerate(body_keys):
+                key_name = f"body_{i}"
+                or_clauses.append(f"body_text LIKE :{key_name}")
+                params[key_name] = f"%{k}%"
+            sql += f" AND ({' OR '.join(or_clauses)})"
+
+        # 6. 获取最新的一条
+        sql += " ORDER BY uid DESC LIMIT 1"
+
+        try:
+            # 执行查询
+            result = db.execute(text(sql), params).fetchone()
+            
+            if not result:
+                logging.info(f"DB Search: No email found for {sender} -> {recipient}")
+                return None
+                
+            target_uid = result.uid
+            target_subject = result.subject
+            logging.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
+            
+        except Exception as e:
+            logging.error(f"DB Search Error: {e}")
+            return f"数据库查询失败: {str(e)}"
+
+        # =========================================================
+        # 第二步:去 IMAP 拉取原始内容并转发 (放入线程池执行 IO 操作)
+        # =========================================================
+        
+        def _worker():
+            mail = None
+            try:
+                # 1. 连接 IMAP
+                mail = EmailAuthorizationService._connect_imap_with_proxy(
+                    auth.imap_server, auth.imap_port, 
+                    auth.proxy_host, auth.proxy_port, 
+                    auth.proxy_username, auth.proxy_password
+                )
+                mail.login(auth.email, auth.authorization_code)
+                mail.select("INBOX")
+                
+                # 2. 根据 UID 精准拉取 (使用 fetch)
+                # 注意:IMAPClient 的 fetch 方法
+                # UID 必须转为 int 或者 sequence set 字符串
+                res = mail.fetch([target_uid], ["RFC822"])
+                
+                if not res or target_uid not in res:
+                    return f"数据库中存在 UID {target_uid},但在邮箱服务器上未找到 (可能已被删除)"
+                
+                msg_data = res[target_uid][b"RFC822"]
+                msg = email.message_from_bytes(msg_data)
+
+                # 3. 处理转发逻辑
+                # 解析原标题 (为了做转发前缀)
+                subject_raw = msg.get("Subject", "")
+                subject = target_subject # 优先用数据库查出来的,或者重新解析也可
+                
+                # 修改 Header 进行转发
+                # 警告:这种方式会破坏原邮件的 DKIM 签名,导致进入垃圾箱概率增加
+                # 但为了保持你原有的逻辑,这里继续使用 Header 修改法
+                del msg['From']
+                del msg['To']
+                del msg['Cc'] # 转发通常去掉抄送
+                del msg['Subject']
+                
+                msg['From'] = auth.email  # 发件人必须是当前授权账号
+                msg['To'] = forward_to
+                msg['Subject'] = f"FWD: {subject}"
+                
+                # 4. 发送邮件 (SMTP)
+                EmailAuthorizationService.send_email_smtp(auth, msg)
+                
+                return f"邮件 '{subject}' (UID: {target_uid}) 已成功转发至: {forward_to}"
+                
+            except Exception as e:
+                logging.error(f"IMAP Forward Error: {e}")
+                return f"邮件转发过程出错: {str(e)}"
+            finally:
+                if mail:
+                    try:
+                        mail.logout()
+                    except: pass
+
+        # 在线程池中运行耗时 IO 操作
+        return await run_in_threadpool(_worker)
 
     @staticmethod
     async def send_email(

+ 1 - 1
app/services/llm_service.py

@@ -21,7 +21,7 @@ class LlmService:
         obj = (await db.execute(stmt)).scalar_one_or_none()
         if not obj:
             raise NotFoundError("Schema not exist")
-        parsed_obj = await LlmService.parse_data_async(payload.input_raw_str, obj.schema_content)
+        parsed_obj = await LlmService.parse_data_async(payload.input_raw_str, obj.schema_json)
         out = ParseUserInputsOut(parsed_obj=parsed_obj)
         return out
     

+ 85 - 36
app/services/seaweedfs_service.py

@@ -1,112 +1,161 @@
-# app/services/seaweedfs_service.py
-
+import os
 import httpx
 from fastapi import UploadFile
 from app.core.biz_exception import BizLogicError
 from app.core.logger import logger
 
-
 class SeaweedFSService:
-    MASTER_URL = "http://127.0.0.1:9333"  # SeaweedFS master 地址
-    DOWNLOAD_GATEWAY = "http://45.137.220.138:8888/api/resource/download_file"
+    # 1. 优先读取环境变量,默认使用 Docker 内部网络的服务名
+    # 只要后端和 SeaweedFS 在同一个 Docker 网络 (visafly-net),这个地址就是通的
+    MASTER_URL = os.getenv("SEAWEEDFS_MASTER_URL", "http://seaweedfs-master:9333")
+    
+    # 文件下载的公网入口(经过你的 Nginx + Backend 代理)
+    DOWNLOAD_GATEWAY = "https://visafly.top/api/resource/download_file"
 
     @classmethod
     async def upload(cls, file: UploadFile):
-        """上传文件到 SeaweedFS(异步)"""
+        """
+        上传文件到 SeaweedFS
+        """
         try:
-            async with httpx.AsyncClient(timeout=10) as client:
-                # 1️⃣ 获取 volume
-                assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
-                assign_resp.raise_for_status()
-                assign_data = assign_resp.json()
+            async with httpx.AsyncClient(timeout=10.0) as client:
+                # 1️⃣ 向 Master 申请 fid
+                # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
+                try:
+                    assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
+                    assign_resp.raise_for_status()
+                except Exception as e:
+                    logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
+                    raise BizLogicError("Storage service unavailable")
 
-                fid = assign_data["fid"]
-                public_url = assign_data["publicUrl"]
+                assign_data = assign_resp.json()
+                fid = assign_data.get("fid")
+                
+                # 2️⃣ 获取上传地址
+                # 在 Docker 共享网络模式下,Master 返回的 publicUrl 就是 "seaweedfs-volume:8080"
+                # 后端可以直接解析并访问这个地址,无需任何修改
+                public_url = assign_data.get("publicUrl")
+                
+                if not public_url:
+                    raise BizLogicError("No volume server available")
 
                 upload_url = f"http://{public_url}/{fid}"
                 download_url = f"{cls.DOWNLOAD_GATEWAY}?fid={fid}"
 
-                # 2️⃣ 上传文件
+                # 3️⃣ 上传文件数据
+                # 读取文件内容
+                file_content = await file.read()
                 files = {
                     "file": (
                         file.filename,
-                        await file.read(),
-                        file.content_type,
+                        file_content,
+                        file.content_type or "application/octet-stream",
                     )
                 }
 
+                # logger.info(f"Uploading to Volume Node: {upload_url}")
                 upload_resp = await client.post(upload_url, files=files)
 
                 if upload_resp.status_code == 201:
                     return {
                         "fid": fid,
                         "url": download_url,
+                        "size": len(file_content)
                     }
 
-                raise BizLogicError(f"file upload error: {upload_resp.text}")
+                logger.error(f"Upload failed. Status: {upload_resp.status_code}, Body: {upload_resp.text}")
+                raise BizLogicError(f"Storage upload error: {upload_resp.text}")
 
+        except BizLogicError as e:
+            raise e
         except Exception as e:
-            logger.exception("SeaweedFS upload failed")
-            raise BizLogicError(f"file upload exception: {e}")
+            logger.exception("SeaweedFS upload unexpected error")
+            raise BizLogicError(f"Upload exception: {str(e)}")
 
     @classmethod
     async def get(cls, fid: str):
-        """根据 fid 读取文件(异步)"""
+        """
+        根据 fid 读取文件流
+        """
         try:
+            if not fid:
+                return None
+                
+            # fid 格式通常是 "3,016a...",逗号前是 volumeId
             volume_id = fid.split(",")[0]
 
-            async with httpx.AsyncClient(timeout=10) as client:
+            async with httpx.AsyncClient(timeout=10.0) as client:
+                # 1️⃣ 查询文件位置
                 lookup_resp = await client.get(
                     f"{cls.MASTER_URL}/dir/lookup",
                     params={"volumeId": volume_id},
                 )
-                lookup_resp.raise_for_status()
+                
+                if lookup_resp.status_code != 200:
+                    logger.warning(f"Lookup failed for fid {fid}: {lookup_resp.text}")
+                    return None
+                    
                 data = lookup_resp.json()
+                locations = data.get("locations")
 
-                if not data.get("locations"):
+                if not locations:
                     return None
 
-                public_url = data["locations"][0]["publicUrl"]
+                # 2️⃣ 直接请求 Volume Server
+                # 同样直接使用 Docker 内部地址
+                public_url = locations[0]["publicUrl"]
                 file_url = f"http://{public_url}/{fid}"
 
                 file_resp = await client.get(file_url)
+                
                 if file_resp.status_code == 200:
                     return (
                         file_resp.content,
-                        file_resp.headers.get(
-                            "Content-Type", "application/octet-stream"
-                        ),
+                        file_resp.headers.get("Content-Type", "application/octet-stream"),
                     )
 
                 return None
 
         except Exception as e:
-            logger.exception(f"SeaweedFS get failed, reason={e}")
+            logger.exception(f"SeaweedFS get failed for fid {fid}")
             return None
 
     @classmethod
     async def delete(cls, fid: str) -> bool:
-        """删除文件(异步)"""
+        """
+        删除文件
+        """
         try:
+            if not fid:
+                return False
+
             volume_id = fid.split(",")[0]
 
-            async with httpx.AsyncClient(timeout=10) as client:
+            async with httpx.AsyncClient(timeout=5.0) as client:
+                # 1️⃣ 查找位置
                 lookup_resp = await client.get(
                     f"{cls.MASTER_URL}/dir/lookup",
                     params={"volumeId": volume_id},
                 )
-                lookup_resp.raise_for_status()
+                
+                if lookup_resp.status_code != 200:
+                    return False
+                    
                 data = lookup_resp.json()
+                locations = data.get("locations")
 
-                if not data.get("locations"):
+                if not locations:
                     return False
 
-                public_url = data["locations"][0]["publicUrl"]
+                # 2️⃣ 发送删除请求
+                public_url = locations[0]["publicUrl"]
                 delete_url = f"http://{public_url}/{fid}"
 
                 del_resp = await client.delete(delete_url)
-                return del_resp.status_code == 202
+                
+                # SeaweedFS 删除成功通常返回 200 或 202
+                return del_resp.status_code in [200, 202]
 
         except Exception as e:
-            logger.exception(f"SeaweedFS delete failed, reason={e}")
-            return False
+            logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
+            return False

+ 10 - 4
docker-compose.yml

@@ -17,7 +17,13 @@ services:
     env_file:
       - .env
 
-    # 这是一个技巧:允许容器通过 'host.docker.internal' 访问宿主机
-    # 如果你的 DB_HOST 填公网 IP 连不上,可以试着填 host.docker.internal
-    extra_hosts:
-      - "host.docker.internal:host-gateway"
+    environment:
+      # 🔴 直接用容器名访问,简单粗暴
+      - SEAWEEDFS_MASTER_URL=http://seaweedfs-master:9333
+    networks:
+      - visafly-net # 加入同一个网络
+    # 不需要 extra_hosts 了,删掉它
+
+networks:
+  visafly-net:
+    external: true