| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868 |
- import threading
- import socket
- import socks
- import imaplib
- import smtplib
- import email
- import asyncio
- import re
- import time
- from datetime import datetime, timedelta, timezone
- from email.message import EmailMessage
- from email.header import decode_header
- from typing import List, Optional
- from sqlalchemy.orm import Session
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, text
- from starlette.concurrency import run_in_threadpool
- from app.core.logger import logger
- from app.core.biz_exception import NotFoundError, BizLogicError
- from app.models.email_authorizations import EmailAuthorization
- from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
- # 保持锁逻辑不变
- _PROXY_LOCK = threading.Lock()
- class EmailAuthorizationService:
-
- DEFAULT_READ_TOP_N_EMAIL = 10
- RETRY_DELAY_SECONDS = 5
-
- # =================================================================
- # 数据库操作 (DB CRUD) - 使用 AsyncSession
- # =================================================================
- @staticmethod
- async def get_all(db: AsyncSession) -> List[EmailAuthorization]:
- # AsyncSession 不支持 db.query,需要用 select(Model)
- stmt = select(EmailAuthorization).order_by(EmailAuthorization.id.desc())
- result = await db.execute(stmt)
- return result.scalars().all()
- @staticmethod
- async def get_by_id(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
- stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
- result = await db.execute(stmt)
- # scalar_one_or_none 类似于 .first(),但更严格(如果有多个会报错,这里id是主键所以没问题)
- obj = result.scalar_one_or_none()
- if not obj:
- raise NotFoundError("Email authorization not found")
- return obj
-
- @staticmethod
- async def get_by_email(db: AsyncSession, email: str) -> Optional[EmailAuthorization]:
- stmt = select(EmailAuthorization).where(EmailAuthorization.email == email)
- result = await db.execute(stmt)
- obj = result.scalar_one_or_none()
- if not obj:
- raise NotFoundError("Email authorization not found")
- return obj
-
- @staticmethod
- async def create(db: AsyncSession, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
- # 先检查是否存在
- stmt = select(EmailAuthorization).where(EmailAuthorization.email == obj_in.email)
- result = await db.execute(stmt)
- if result.scalar_one_or_none():
- raise BizLogicError(f"Email {obj_in.email} already exist")
-
- # 创建对象
- db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
-
- # db.add 是同步方法(只是添加到 session 上下文)
- db.add(db_obj)
-
- # commit 和 refresh 是异步的
- await db.commit()
- await db.refresh(db_obj)
- return db_obj
- @staticmethod
- async def update(db: AsyncSession, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
- stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
- result = await db.execute(stmt)
- db_obj = result.scalar_one_or_none()
-
- if not db_obj:
- raise NotFoundError("Email authorization not found")
-
- for field, value in obj_in.dict(exclude_unset=True).items():
- setattr(db_obj, field, value)
-
- db.add(db_obj)
- await db.commit()
- await db.refresh(db_obj)
- return db_obj
- @staticmethod
- async def delete(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
- stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
- result = await db.execute(stmt)
- db_obj = result.scalar_one_or_none()
-
- if not db_obj:
- raise NotFoundError("Email authorization not found")
-
- # delete 也是同步标记
- await db.delete(db_obj)
- await db.commit()
- return db_obj
- @staticmethod
- def _connect_imap_with_proxy(
- host: str,
- port: int,
- proxy_host: Optional[str] = None,
- proxy_port: Optional[int] = None,
- proxy_user: Optional[str] = None,
- proxy_password: Optional[str] = None,
- ) -> imaplib.IMAP4_SSL:
- """
- 创建连接 (同步方法,将在线程中运行)
- 使用 Lock 确保 socket patching 不会影响其他并发请求
- """
- if proxy_host and proxy_port and proxy_port > 0:
- with _PROXY_LOCK: # 加锁,防止多线程同时修改全局 socket
- original_socket = socket.socket
- socks.setdefaultproxy(
- proxy_type=socks.SOCKS5,
- addr=proxy_host,
- port=proxy_port,
- username=proxy_user or None,
- password=proxy_password or None,
- )
- socket.socket = socks.socksocket
- try:
- imap = imaplib.IMAP4_SSL(host, port)
- finally:
- socket.socket = original_socket
- else:
- imap = imaplib.IMAP4_SSL(host, port)
- return imap
-
- @staticmethod
- def _connect_smtp_with_proxy(
- host: str,
- port: int,
- proxy_host: Optional[str] = None,
- proxy_port: Optional[int] = None,
- proxy_user: Optional[str] = None,
- proxy_password: Optional[str] = None,
- ) -> smtplib.SMTP_SSL:
- """
- 创建连接 (同步方法,将在线程中运行)
- """
- if proxy_host and proxy_port and proxy_port > 0:
- with _PROXY_LOCK: # 加锁
- original_socket = socket.socket
- socks.setdefaultproxy(
- proxy_type=socks.SOCKS5,
- addr=proxy_host,
- port=proxy_port,
- username=proxy_user or None,
- password=proxy_password or None,
- )
- socket.socket = socks.socksocket
- try:
- smtp = smtplib.SMTP_SSL(host, port)
- finally:
- socket.socket = original_socket
- else:
- smtp = smtplib.SMTP_SSL(host, port)
- return smtp
-
- @staticmethod
- async def fetch_email_authorizations2(
- db: Session,
- auth,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str
- ):
- # =========================================================
- # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
- # =========================================================
-
- # 1. 构建动态 SQL
- # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
- sql = "SELECT uid, subject, body_text FROM emails WHERE 1=1"
- params = {}
- # 2. 处理发件人 (模糊匹配)
- if sender.strip():
- sql += " AND sender LIKE :sender"
- params['sender'] = f"%{sender.strip()}%"
- # 3. 处理收件人 (模糊匹配)
- if recipient.strip():
- sql += " AND recipient LIKE :recipient"
- params['recipient'] = f"%{recipient.strip()}%"
- # 4. 处理主题关键词 (OR 关系)
- subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
- if subj_keys:
- for i, k in enumerate(subj_keys):
- key_name = f"subj_{i}"
- # 直接拼接到主 SQL 中,要求同时满足
- sql += f" AND subject LIKE :{key_name}"
- params[key_name] = f"%{k}%"
- # 5. 处理内容关键词 (OR 关系)
- body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
- if body_keys:
- for i, k in enumerate(body_keys):
- key_name = f"body_{i}"
- # 直接拼接到主 SQL 中,要求同时满足
- sql += f" AND body_text LIKE :{key_name}"
- params[key_name] = f"%{k}%"
- # 6. 获取最新的一条
- sql += " ORDER BY uid DESC LIMIT 1"
-
- # 执行查询
- result_proxy = await db.execute(text(sql), params)
- result = result_proxy.fetchone()
-
- if not result:
- logger.info(f"DB Search: No email found for {sender} -> {recipient}")
- return None
-
- target_uid = result.uid
- target_subject = result.subject
- target_body_text = result.body_text
- logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
- return f'{target_subject}\n{target_body_text}'
- @staticmethod
- async def fetch_email_authorizations(
- auth,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- sent_date: str,
- expiry: int = 300,
- only_text: bool = True
- ) -> Optional[str]:
- """
- 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
- """
- def _worker():
- EMAIL_ACCOUNT = auth.email
- EMAIL_PASSWORD = auth.authorization_code
- IMAP_SERVER = auth.imap_server
- IMAP_PORT = auth.imap_port
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
- # === 时间计算 ===
- sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
- max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
- expiry_at = time.time() + max_wait_time
- def get_received_time(msg):
- """
- 使用最后一条 Received 头解析收件时间
- """
- received_headers = msg.get_all("Received", [])
- if not received_headers:
- return None
- for i, header in enumerate(received_headers, 1):
- logger.debug(f" [{i}] {header}")
- last_received = received_headers[-1]
- if ";" not in last_received:
- return None
- time_str = last_received.split(";")[-1].strip()
- dt_tuple = email.utils.parsedate_tz(time_str)
- if not dt_tuple:
- return None
- return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- IMAP_SERVER,
- IMAP_PORT,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
- mail.select("INBOX")
- while time.time() < expiry_at:
- mail.noop() # 刷新邮箱状态
- _, data = mail.search(None, "ALL")
- mail_ids = data[0].split()
- if not mail_ids:
- time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
- continue
- recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
- messages = []
- debug = True
- for email_id in reversed(recent_ids):
- res, msg_data = mail.fetch(email_id, "(RFC822)")
- if res != "OK" or not msg_data:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
- continue
- msg_bytes = None
- for part in msg_data:
- if isinstance(part, tuple):
- msg_bytes = part[1]
- if not msg_bytes:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
- continue
- msg = email.message_from_bytes(msg_bytes)
- received_dt = get_received_time(msg)
- if not received_dt:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
- continue
- messages.append((msg, received_dt))
- if debug:
- logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
- logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
- # 按收件时间降序排序
- messages.sort(key=lambda x: x[1], reverse=True)
- for msg, received_dt in messages:
- # 判断是否在发送时间后的有效窗口内
- if received_dt < sent_dt:
- if debug:
- logger.debug(f"[INFO] 邮件太旧: {received_dt}")
- continue
- if received_dt > sent_dt + timedelta(seconds=expiry):
- if debug:
- logger.debug(f"[INFO] 邮件太新: {received_dt}")
- continue
- # 匹配发件人/收件人
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
- if sender.lower() not in msg_from.lower():
- if debug:
- logger.debug("发件人不匹配")
- continue
- if recipient.lower() not in msg_to.lower():
- if debug:
- logger.debug("收件人不匹配")
- continue
- # 匹配主题
- subject, enc = decode_header(msg.get("Subject"))[0]
- if isinstance(subject, bytes):
- subject = subject.decode(enc or "utf-8", errors="ignore")
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
- continue
- # 提取正文
- body = EmailAuthorizationService._extract_body(msg, only_text)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys):
- continue
- # 找到匹配邮件 → 返回内容
- mail.close()
- mail.logout()
- return body.strip()
- # 未匹配到 → 等待重试
- time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
- mail.close()
- mail.logout()
- raise NotFoundError("Get email timeout")
-
- return await run_in_threadpool(_worker)
-
- @staticmethod
- def _process_recent_emails_sync(
- mail, recent_ids, sent_dt, expiry, sender, recipient, subject_keys, body_keys, only_text
- ) -> Optional[str]:
- """
- 同步辅助函数:处理邮件解析逻辑。
- 将这段繁琐的逻辑放入线程运行,避免阻塞 Async Loop。
- """
- messages = []
- debug = True
- for email_id in reversed(recent_ids):
- res, msg_data = mail.fetch(email_id, "(RFC822)")
- if res != "OK" or not msg_data:
- continue
- msg_bytes = None
- for part in msg_data:
- if isinstance(part, tuple):
- msg_bytes = part[1]
- if not msg_bytes:
- continue
- msg = email.message_from_bytes(msg_bytes)
-
- # 解析时间
- received_dt = None
- received_headers = msg.get_all("Received", [])
- if received_headers:
- last_received = received_headers[-1]
- if ";" in last_received:
- time_str = last_received.split(";")[-1].strip()
- dt_tuple = email.utils.parsedate_tz(time_str)
- if dt_tuple:
- received_dt = datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
- if not received_dt:
- continue
- messages.append((msg, received_dt))
- # 排序
- messages.sort(key=lambda x: x[1], reverse=True)
- for msg, received_dt in messages:
- # 时间判定
- if received_dt < sent_dt:
- continue
- if received_dt > sent_dt + timedelta(seconds=expiry):
- continue
- # 匹配逻辑
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
-
- if sender.lower() not in msg_from.lower():
- continue
- if recipient.lower() not in msg_to.lower():
- continue
- subject_raw = msg.get("Subject")
- subject = ""
- if subject_raw:
- decoded_list = decode_header(subject_raw)
- if decoded_list:
- sub_bytes, enc = decoded_list[0]
- if isinstance(sub_bytes, bytes):
- subject = sub_bytes.decode(enc or "utf-8", errors="ignore")
- else:
- subject = str(sub_bytes)
-
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
- continue
- body = EmailAuthorizationService._extract_body(msg, only_text)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys):
- continue
- return body.strip()
-
- return None
- @staticmethod
- async def fetch_email_authorizations_from_top_n(
- auth,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- top: int = 10,
- only_text: bool = True
- ) -> Optional[str]:
-
- # 定义一个纯同步的 worker 函数来执行所有 IMAP 逻辑
- def _worker():
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- auth.imap_server,
- auth.imap_port,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- try:
- mail.login(auth.email, auth.authorization_code)
- mail.select("INBOX")
- _, data = mail.search(None, "ALL")
- mail_ids = data[0].split()
- if not mail_ids:
- return None
- recent_ids = mail_ids[-top:]
-
- # 复用上面的解析逻辑,但稍作调整,因为这个方法不需要时间过滤
- # 这里为了简单,直接写精简版解析
- for email_id in reversed(recent_ids):
- res, msg_data = mail.fetch(email_id, "(RFC822)")
- if res != "OK" or not msg_data: continue
- msg_bytes = None
- for part in msg_data:
- if isinstance(part, tuple): msg_bytes = part[1]
-
- if not msg_bytes: continue
- msg = email.message_from_bytes(msg_bytes)
- # 匹配逻辑
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
- if sender.lower() not in msg_from.lower(): continue
- if recipient.lower() not in msg_to.lower(): continue
- subject_raw = msg.get("Subject")
- subject = ""
- if subject_raw:
- d = decode_header(subject_raw)[0]
- subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
-
- body = EmailAuthorizationService._extract_body(msg, only_text)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
-
- return body.strip()
-
- return None
- finally:
- try:
- mail.close()
- mail.logout()
- except:
- pass
- return await run_in_threadpool(_worker)
- @staticmethod
- async def forward_first_matching_email(
- auth,
- forward_to: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str
- ):
- # 整个逻辑比较复杂且都是 IO,直接打包扔进线程池
- def _worker():
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
-
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- auth.imap_server, auth.imap_port, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password
- )
- try:
- mail.login(auth.email, auth.authorization_code)
- mail.select("INBOX")
-
- target = recipient
- # 注意:IMAP search 语法对引号很敏感,确保 target 没有特殊字符破坏命令
- query = f'(HEADER To "{target}")'
- res, data = mail.uid("search", None, query)
- if res != "OK": return None
-
- uids = data[0].split()
- msgs = []
- for uid in uids:
- res, msg_data = mail.uid("fetch", uid, "(RFC822)")
- if res != "OK": continue
- msg = email.message_from_bytes(msg_data[0][1])
- # 处理 Date 头可能缺失的情况
- date_str = msg.get("Date")
- if date_str:
- date_ = email.utils.parsedate_to_datetime(date_str)
- msgs.append((date_, msg))
-
- msgs.sort(key=lambda x: x[0], reverse=True)
- for _, msg in msgs:
- msg_from = msg.get("From", "")
- if sender.lower() not in msg_from.lower(): continue
-
- subject_raw = msg.get("Subject", "")
- subject = ""
- d = decode_header(subject_raw)[0]
- subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
-
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
-
- body = EmailAuthorizationService._extract_body(msg, True)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
- # 准备转发
- # 注意:直接修改 Header 转发可能会破坏 DKIM 签名,更好的方式是作为附件转发
- # 但这里保持原逻辑
- del msg['From']
- del msg['To']
- del msg['Subject']
- msg['From'] = auth.email
- msg['To'] = forward_to
- msg['Subject'] = f"FWD: {subject}"
-
- # 调用同步的 send 方法
- EmailAuthorizationService.send_email_smtp(auth, msg)
-
- return f"邮件 '{subject}' 已成功转发至: {forward_to}"
- return None
- finally:
- try:
- mail.logout()
- except: pass
- return await run_in_threadpool(_worker)
-
- @staticmethod
- async def forward_first_matching_email2(
- db: Session,
- auth,
- forward_to: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str
- ):
- # =========================================================
- # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
- # =========================================================
-
- # 1. 构建动态 SQL
- # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
- sql = "SELECT uid, subject FROM emails WHERE 1=1"
- params = {}
- # 2. 处理发件人 (模糊匹配)
- if sender.strip():
- sql += " AND sender LIKE :sender"
- params['sender'] = f"%{sender.strip()}%"
- # 3. 处理收件人 (模糊匹配)
- if recipient.strip():
- sql += " AND recipient LIKE :recipient"
- params['recipient'] = f"%{recipient.strip()}%"
- # 4. 处理主题关键词 (OR 关系)
- subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
- if subj_keys:
- for i, k in enumerate(subj_keys):
- key_name = f"subj_{i}"
- # 直接拼接到主 SQL 中,要求同时满足
- sql += f" AND subject LIKE :{key_name}"
- params[key_name] = f"%{k}%"
- # 5. 处理内容关键词 (OR 关系)
- body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
- if body_keys:
- for i, k in enumerate(body_keys):
- key_name = f"body_{i}"
- # 直接拼接到主 SQL 中,要求同时满足
- sql += f" AND body_text LIKE :{key_name}"
- params[key_name] = f"%{k}%"
- # 6. 获取最新的一条
- sql += " ORDER BY uid DESC LIMIT 1"
- try:
- # 执行查询
- result_proxy = await db.execute(text(sql), params)
- result = result_proxy.fetchone()
-
- if not result:
- logger.info(f"DB Search: No email found for {sender} -> {recipient}")
- return None
-
- target_uid = result.uid
- target_subject = result.subject
- logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
-
- except Exception as e:
- logger.error(f"DB Search Error: {e}")
- return f"数据库查询失败: {str(e)}"
- # =========================================================
- # 第二步:去 IMAP 拉取原始内容并转发 (放入线程池执行 IO 操作)
- # =========================================================
-
- def _worker():
- mail = None
- try:
- # 1. 连接 IMAP
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- auth.imap_server, auth.imap_port,
- auth.proxy_host, auth.proxy_port,
- auth.proxy_username, auth.proxy_password
- )
- mail.login(auth.email, auth.authorization_code)
- mail.select("INBOX")
-
- # 2. 根据 UID 精准拉取 (使用 fetch)
- # 注意:IMAPClient 的 fetch 方法
- # UID 必须转为 int 或者 sequence set 字符串
- res, data = mail.uid('fetch', str(target_uid), '(RFC822)')
-
- # 🔴 修正点:不要写 if target_uid in res
- # res 是状态字符串 "OK",data 是包含邮件内容的列表
- if res != 'OK':
- return f"IMAP Fetch 失败,状态码: {res}"
-
- if not data or not data[0]:
- return f"未找到 UID {target_uid} 的邮件内容 (可能已被物理删除)"
- # data[0] 通常是 tuple (byte_header, byte_content),但也可能是 None
- if isinstance(data[0], tuple):
- raw_email_bytes = data[0][1]
- else:
- # 如果 data[0] 只是 bytes (例如 b')'),说明没拿到邮件体
- return f"邮件数据格式异常,无法解析: {str(data)}"
- msg = email.message_from_bytes(raw_email_bytes)
- # 3. 处理转发逻辑
- # 使用数据库中查到的标题,确保标题准确
- subject = target_subject
- # 修改 Header 进行转发
- del msg['From']
- del msg['To']
- del msg['Cc']
- del msg['Subject']
-
- msg['From'] = auth.email # 发件人覆写为当前授权账号
- msg['To'] = forward_to
- msg['Subject'] = f"FWD: {subject}"
-
- # 4. 发送邮件 (SMTP)
- EmailAuthorizationService.send_email_smtp(auth, msg)
-
- return f"邮件 '{subject}' (UID: {target_uid}) 已成功转发至: {forward_to}"
-
- except Exception as e:
- logger.error(f"IMAP Forward Error: {e}")
- return f"邮件转发过程出错: {str(e)}"
- finally:
- if mail:
- try:
- mail.logout()
- except: pass
- # 在线程池中运行耗时 IO 操作
- return await run_in_threadpool(_worker)
- @staticmethod
- async def send_email(
- auth,
- send_to: str,
- subject: str,
- content_type: str,
- content: str
- ):
- def _worker():
- msg = EmailMessage()
- msg["From"] = auth.email
- msg["To"] = send_to
- msg["Subject"] = subject
-
- if content_type.lower() == "html":
- msg.set_content("") # 占位
- msg.add_alternative(content, subtype="html")
- else:
- msg.set_content(content)
- EmailAuthorizationService.send_email_smtp(auth, msg)
- return f"邮件 '{subject}' 成功发送至: {send_to}"
- return await run_in_threadpool(_worker)
- @staticmethod
- async def send_email_bulk(
- auth,
- send_to: str,
- subject: str,
- content_type: str,
- content: str
- ):
- def _worker():
- bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
- msg = EmailMessage()
- msg["From"] = auth.email
- msg["To"] = bcc_list[0] if bcc_list else auth.email # Fallback
- msg["Subject"] = subject
-
- if content_type.lower() == "html":
- msg.set_content("")
- msg.add_alternative(content, subtype="html")
- else:
- msg.set_content(content)
-
- EmailAuthorizationService.send_email_smtp(auth, msg, bcc_list=bcc_list)
- return f"邮件 '{subject}' 成功发送至: {send_to}"
- return await run_in_threadpool(_worker)
- # ----------------------------------------------------------------------
- # 底层 SMTP 发送 (保持同步,供 Worker 调用)
- # ----------------------------------------------------------------------
- @staticmethod
- def send_email_smtp(auth, msg, bcc_list=None):
- if bcc_list is None:
- bcc_list = []
-
- # 这里的 connect 内部已经加了锁,是安全的
- mail = EmailAuthorizationService._connect_smtp_with_proxy(
- auth.smtp_server,
- auth.smtp_port,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- try:
- mail.login(auth.email, auth.authorization_code)
- if bcc_list:
- mail.send_message(msg, to_addrs=bcc_list)
- else:
- mail.send_message(msg)
- finally:
- mail.quit()
- @staticmethod
- def _extract_body(msg, only_text: bool = True) -> str:
- # 纯 CPU 计算,不需要 async,保留原样
- body_parts = []
- if msg.is_multipart():
- for part in msg.walk():
- ctype = part.get_content_type()
- if only_text and ctype != "text/plain":
- continue
- if not only_text and ctype not in ["text/plain", "text/html"]:
- continue
- if part.get("Content-Disposition"):
- continue
- charset = part.get_content_charset() or "utf-8"
- try:
- payload = part.get_payload(decode=True)
- if payload:
- text = payload.decode(charset, errors="ignore")
- body_parts.append(text)
- except Exception:
- continue
- else:
- charset = msg.get_content_charset() or "utf-8"
- payload = msg.get_payload(decode=True)
- if payload:
- body_parts.append(payload.decode(charset, errors="ignore"))
-
- body = "\n".join(body_parts)
- return re.sub(r"\s+", " ", body.strip())
|