import threading
import socket
import socks
import imaplib
import smtplib
import email
import asyncio
import re
import time
from datetime import datetime, timedelta, timezone
import email.policy
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from email.header import decode_header
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from starlette.concurrency import run_in_threadpool
from app.core.logger import logger
from app.core.biz_exception import NotFoundError, BizLogicError
from app.models.email_authorizations import EmailAuthorization
from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
# 保持锁逻辑不变
_PROXY_LOCK = threading.Lock()
class EmailAuthorizationService:
DEFAULT_READ_TOP_N_EMAIL = 10
RETRY_DELAY_SECONDS = 5
# =================================================================
# 数据库操作 (DB CRUD) - 使用 AsyncSession
# =================================================================
@staticmethod
async def get_all(db: AsyncSession) -> List[EmailAuthorization]:
# AsyncSession 不支持 db.query,需要用 select(Model)
stmt = select(EmailAuthorization).order_by(EmailAuthorization.id.desc())
result = await db.execute(stmt)
return result.scalars().all()
@staticmethod
async def get_by_id(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
result = await db.execute(stmt)
# scalar_one_or_none 类似于 .first(),但更严格(如果有多个会报错,这里id是主键所以没问题)
obj = result.scalar_one_or_none()
if not obj:
raise NotFoundError("Email authorization not found")
return obj
@staticmethod
async def get_by_email(db: AsyncSession, email: str) -> Optional[EmailAuthorization]:
stmt = select(EmailAuthorization).where(EmailAuthorization.email == email)
result = await db.execute(stmt)
obj = result.scalar_one_or_none()
if not obj:
raise NotFoundError("Email authorization not found")
return obj
@staticmethod
async def create(db: AsyncSession, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
# 先检查是否存在
stmt = select(EmailAuthorization).where(EmailAuthorization.email == obj_in.email)
result = await db.execute(stmt)
if result.scalar_one_or_none():
raise BizLogicError(f"Email {obj_in.email} already exist")
# 创建对象
db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
# db.add 是同步方法(只是添加到 session 上下文)
db.add(db_obj)
# commit 和 refresh 是异步的
await db.commit()
await db.refresh(db_obj)
return db_obj
@staticmethod
async def update(db: AsyncSession, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
result = await db.execute(stmt)
db_obj = result.scalar_one_or_none()
if not db_obj:
raise NotFoundError("Email authorization not found")
for field, value in obj_in.dict(exclude_unset=True).items():
setattr(db_obj, field, value)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
@staticmethod
async def delete(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
result = await db.execute(stmt)
db_obj = result.scalar_one_or_none()
if not db_obj:
raise NotFoundError("Email authorization not found")
# delete 也是同步标记
await db.delete(db_obj)
await db.commit()
return db_obj
@staticmethod
def _connect_imap_with_proxy(
host: str,
port: int,
proxy_host: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_user: Optional[str] = None,
proxy_password: Optional[str] = None,
) -> imaplib.IMAP4_SSL:
"""
创建连接 (同步方法,将在线程中运行)
使用 Lock 确保 socket patching 不会影响其他并发请求
"""
if proxy_host and proxy_port and proxy_port > 0:
with _PROXY_LOCK: # 加锁,防止多线程同时修改全局 socket
original_socket = socket.socket
socks.setdefaultproxy(
proxy_type=socks.SOCKS5,
addr=proxy_host,
port=proxy_port,
username=proxy_user or None,
password=proxy_password or None,
)
socket.socket = socks.socksocket
try:
imap = imaplib.IMAP4_SSL(host, port)
finally:
socket.socket = original_socket
else:
imap = imaplib.IMAP4_SSL(host, port)
return imap
@staticmethod
def _connect_smtp_with_proxy(
host: str,
port: int,
proxy_host: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_user: Optional[str] = None,
proxy_password: Optional[str] = None,
) -> smtplib.SMTP_SSL:
"""
创建连接 (同步方法,将在线程中运行)
"""
if proxy_host and proxy_port and proxy_port > 0:
with _PROXY_LOCK: # 加锁
original_socket = socket.socket
socks.setdefaultproxy(
proxy_type=socks.SOCKS5,
addr=proxy_host,
port=proxy_port,
username=proxy_user or None,
password=proxy_password or None,
)
socket.socket = socks.socksocket
try:
smtp = smtplib.SMTP_SSL(host, port)
finally:
socket.socket = original_socket
else:
smtp = smtplib.SMTP_SSL(host, port)
return smtp
@staticmethod
async def fetch_email_authorizations2(
db: Session,
auth,
sender: str,
recipient: str,
subject_keywords: str,
body_keywords: str
):
# =========================================================
# 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
# =========================================================
# 1. 构建动态 SQL
# 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
sql = "SELECT uid, subject, body_text FROM emails WHERE 1=1"
params = {}
# 2. 处理发件人 (模糊匹配)
if sender.strip():
sql += " AND sender LIKE :sender"
params['sender'] = f"%{sender.strip()}%"
# 3. 处理收件人 (模糊匹配)
if recipient.strip():
sql += " AND recipient LIKE :recipient"
params['recipient'] = f"%{recipient.strip()}%"
# 4. 处理主题关键词 (OR 关系)
subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
if subj_keys:
for i, k in enumerate(subj_keys):
key_name = f"subj_{i}"
# 直接拼接到主 SQL 中,要求同时满足
sql += f" AND subject LIKE :{key_name}"
params[key_name] = f"%{k}%"
# 5. 处理内容关键词 (OR 关系)
body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
if body_keys:
for i, k in enumerate(body_keys):
key_name = f"body_{i}"
# 直接拼接到主 SQL 中,要求同时满足
sql += f" AND body_text LIKE :{key_name}"
params[key_name] = f"%{k}%"
# 6. 获取最新的一条
sql += " ORDER BY uid DESC LIMIT 1"
# 执行查询
result_proxy = await db.execute(text(sql), params)
result = result_proxy.fetchone()
if not result:
logger.info(f"DB Search: No email found for {sender} -> {recipient}")
return None
target_uid = result.uid
target_subject = result.subject
target_body_text = result.body_text
logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
return f'{target_subject}\n{target_body_text}'
@staticmethod
async def fetch_email_authorizations(
auth,
sender: str,
recipient: str,
subject_keywords: str,
body_keywords: str,
sent_date: str,
expiry: int = 300,
only_text: bool = True
) -> Optional[str]:
"""
在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
"""
def _worker():
EMAIL_ACCOUNT = auth.email
EMAIL_PASSWORD = auth.authorization_code
IMAP_SERVER = auth.imap_server
IMAP_PORT = auth.imap_port
subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
# === 时间计算 ===
sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
expiry_at = time.time() + max_wait_time
def get_received_time(msg):
"""
使用最后一条 Received 头解析收件时间
"""
received_headers = msg.get_all("Received", [])
if not received_headers:
return None
for i, header in enumerate(received_headers, 1):
logger.debug(f" [{i}] {header}")
last_received = received_headers[-1]
if ";" not in last_received:
return None
time_str = last_received.split(";")[-1].strip()
dt_tuple = email.utils.parsedate_tz(time_str)
if not dt_tuple:
return None
return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
mail = EmailAuthorizationService._connect_imap_with_proxy(
IMAP_SERVER,
IMAP_PORT,
auth.proxy_host,
auth.proxy_port,
auth.proxy_username,
auth.proxy_password,
)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
mail.select("INBOX")
while time.time() < expiry_at:
mail.noop() # 刷新邮箱状态
_, data = mail.search(None, "ALL")
mail_ids = data[0].split()
if not mail_ids:
time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
continue
recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
messages = []
debug = True
for email_id in reversed(recent_ids):
res, msg_data = mail.fetch(email_id, "(RFC822)")
if res != "OK" or not msg_data:
if debug:
logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
continue
msg_bytes = None
for part in msg_data:
if isinstance(part, tuple):
msg_bytes = part[1]
if not msg_bytes:
if debug:
logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
continue
msg = email.message_from_bytes(msg_bytes)
received_dt = get_received_time(msg)
if not received_dt:
if debug:
logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
continue
messages.append((msg, received_dt))
if debug:
logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
# 按收件时间降序排序
messages.sort(key=lambda x: x[1], reverse=True)
for msg, received_dt in messages:
# 判断是否在发送时间后的有效窗口内
if received_dt < sent_dt:
if debug:
logger.debug(f"[INFO] 邮件太旧: {received_dt}")
continue
if received_dt > sent_dt + timedelta(seconds=expiry):
if debug:
logger.debug(f"[INFO] 邮件太新: {received_dt}")
continue
# 匹配发件人/收件人
msg_from = msg.get("From", "")
msg_to = msg.get("To", "")
if sender.lower() not in msg_from.lower():
if debug:
logger.debug("发件人不匹配")
continue
if recipient.lower() not in msg_to.lower():
if debug:
logger.debug("收件人不匹配")
continue
# 匹配主题
subject, enc = decode_header(msg.get("Subject"))[0]
if isinstance(subject, bytes):
subject = subject.decode(enc or "utf-8", errors="ignore")
if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
continue
# 提取正文
body = EmailAuthorizationService._extract_body(msg, only_text)
if body_keys and not any(k.lower() in body.lower() for k in body_keys):
continue
# 找到匹配邮件 → 返回内容
mail.close()
mail.logout()
return body.strip()
# 未匹配到 → 等待重试
time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
mail.close()
mail.logout()
raise NotFoundError("Get email timeout")
return await run_in_threadpool(_worker)
@staticmethod
def _process_recent_emails_sync(
mail, recent_ids, sent_dt, expiry, sender, recipient, subject_keys, body_keys, only_text
) -> Optional[str]:
"""
同步辅助函数:处理邮件解析逻辑。
将这段繁琐的逻辑放入线程运行,避免阻塞 Async Loop。
"""
messages = []
debug = True
for email_id in reversed(recent_ids):
res, msg_data = mail.fetch(email_id, "(RFC822)")
if res != "OK" or not msg_data:
continue
msg_bytes = None
for part in msg_data:
if isinstance(part, tuple):
msg_bytes = part[1]
if not msg_bytes:
continue
msg = email.message_from_bytes(msg_bytes)
# 解析时间
received_dt = None
received_headers = msg.get_all("Received", [])
if received_headers:
last_received = received_headers[-1]
if ";" in last_received:
time_str = last_received.split(";")[-1].strip()
dt_tuple = email.utils.parsedate_tz(time_str)
if dt_tuple:
received_dt = datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
if not received_dt:
continue
messages.append((msg, received_dt))
# 排序
messages.sort(key=lambda x: x[1], reverse=True)
for msg, received_dt in messages:
# 时间判定
if received_dt < sent_dt:
continue
if received_dt > sent_dt + timedelta(seconds=expiry):
continue
# 匹配逻辑
msg_from = msg.get("From", "")
msg_to = msg.get("To", "")
if sender.lower() not in msg_from.lower():
continue
if recipient.lower() not in msg_to.lower():
continue
subject_raw = msg.get("Subject")
subject = ""
if subject_raw:
decoded_list = decode_header(subject_raw)
if decoded_list:
sub_bytes, enc = decoded_list[0]
if isinstance(sub_bytes, bytes):
subject = sub_bytes.decode(enc or "utf-8", errors="ignore")
else:
subject = str(sub_bytes)
if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
continue
body = EmailAuthorizationService._extract_body(msg, only_text)
if body_keys and not any(k.lower() in body.lower() for k in body_keys):
continue
return body.strip()
return None
@staticmethod
async def fetch_email_authorizations_from_top_n(
auth,
sender: str,
recipient: str,
subject_keywords: str,
body_keywords: str,
top: int = 10,
only_text: bool = True
) -> Optional[str]:
# 定义一个纯同步的 worker 函数来执行所有 IMAP 逻辑
def _worker():
subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
mail = EmailAuthorizationService._connect_imap_with_proxy(
auth.imap_server,
auth.imap_port,
auth.proxy_host,
auth.proxy_port,
auth.proxy_username,
auth.proxy_password,
)
try:
mail.login(auth.email, auth.authorization_code)
mail.select("INBOX")
_, data = mail.search(None, "ALL")
mail_ids = data[0].split()
if not mail_ids:
return None
recent_ids = mail_ids[-top:]
# 复用上面的解析逻辑,但稍作调整,因为这个方法不需要时间过滤
# 这里为了简单,直接写精简版解析
for email_id in reversed(recent_ids):
res, msg_data = mail.fetch(email_id, "(RFC822)")
if res != "OK" or not msg_data: continue
msg_bytes = None
for part in msg_data:
if isinstance(part, tuple): msg_bytes = part[1]
if not msg_bytes: continue
msg = email.message_from_bytes(msg_bytes)
# 匹配逻辑
msg_from = msg.get("From", "")
msg_to = msg.get("To", "")
if sender.lower() not in msg_from.lower(): continue
if recipient.lower() not in msg_to.lower(): continue
subject_raw = msg.get("Subject")
subject = ""
if subject_raw:
d = decode_header(subject_raw)[0]
subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
body = EmailAuthorizationService._extract_body(msg, only_text)
if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
return body.strip()
return None
finally:
try:
mail.close()
mail.logout()
except:
pass
return await run_in_threadpool(_worker)
@staticmethod
async def forward_first_matching_email(
auth,
forward_to: str,
sender: str,
recipient: str,
subject_keywords: str,
body_keywords: str
):
def _worker():
subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
mail = EmailAuthorizationService._connect_imap_with_proxy(
auth.imap_server, auth.imap_port, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password
)
try:
mail.login(auth.email, auth.authorization_code)
mail.select("INBOX")
# 1. 搜索目标邮件
target = recipient
query = f'(HEADER To "{target}")'
res, data = mail.uid("search", None, query)
if res != "OK": return None
uids = data[0].split()
msgs_to_check = []
for uid in uids:
# 使用 RFC822 获取完整内容
res, msg_data = mail.uid("fetch", uid, "(RFC822)")
if res != "OK" or not msg_data: continue
# 临时解析用于排序和初步过滤
raw_bytes = msg_data[0][1]
tmp_msg = email.message_from_bytes(raw_bytes, policy=email.policy.default)
date_str = tmp_msg.get("Date")
if date_str:
try:
date_dt = parsedate_to_datetime(date_str)
msgs_to_check.append((date_dt, tmp_msg, raw_bytes))
except:
continue
# 按时间降序排序(最新的优先)
msgs_to_check.sort(key=lambda x: x[0], reverse=True)
for _, orig_msg, raw_bytes in msgs_to_check:
# --- 过滤逻辑 ---
msg_from = orig_msg.get("From", "")
if sender.lower() not in msg_from.lower(): continue
subject = orig_msg.get("Subject", "")
if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
body_content = EmailAuthorizationService._extract_body(orig_msg, True)
if body_keys and not any(k.lower() in body_content.lower() for k in body_keys): continue
# --- 匹配成功:开始构造转发邮件 ---
# 1. 提取原始信息用于视觉转发头
orig_from = orig_msg.get("From", "Unknown")
orig_date = orig_msg.get("Date", "Unknown")
orig_subject = orig_msg.get("Subject", "No Subject")
orig_to = orig_msg.get("To", "Unknown")
orig_msg_id = orig_msg.get("Message-ID")
fwd_info = (
f"\n\n---------- Forwarded message ----------\n"
f"From: {orig_from}\n"
f"Date: {orig_date}\n"
f"Subject: {orig_subject}\n"
f"To: {orig_to}\n\n"
)
# 2. 构造新的邮件对象 (重新基于原始字节解析,确保附件完整)
msg = email.message_from_bytes(raw_bytes, policy=email.policy.default)
# 3. 清理并重置 Header
headers_to_clean = ['From', 'To', 'Cc', 'Bcc', 'Subject', 'Date', 'Message-ID', 'In-Reply-To', 'References']
for h in headers_to_clean:
del msg[h]
msg['From'] = auth.email
msg['To'] = forward_to
msg['Subject'] = f"Fwd: {orig_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=auth.email.split('@')[-1])
# 4. 【核心】建立上下文关联 (Threading)
if orig_msg_id:
msg['In-Reply-To'] = orig_msg_id
msg['References'] = orig_msg_id
# 5. 【核心】注入视觉转发头 (Visual Prepend)
try:
if msg.is_multipart():
# 遍历部分,找到主要正文并插入
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/plain":
part.set_content(fwd_info + part.get_content())
break
elif ctype == "text/html":
html_fwd = fwd_info.replace("\n", "
")
part.set_content(f"