| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 |
- import re
- import time
- import socket
- import smtplib
- import socks
- import imaplib
- import email
- from datetime import datetime, timedelta, timezone
- from email.message import EmailMessage
- from email.header import decode_header
- from sqlalchemy.orm import Session
- from typing import List, Optional
- from app.core.logger import logger
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.models.email_authorizations import EmailAuthorization
- from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
- class EmailAuthorizationService:
-
- DEFAULT_READ_TOP_N_EMAIL = 10 # 最多读取最近多少封邮件
- RETRY_DELAY_SECONDS = 5 # 每次轮询间隔
-
- @staticmethod
- def get_all(db: Session) -> List[EmailAuthorization]:
- return db.query(EmailAuthorization).order_by(EmailAuthorization.id.desc()).all()
- @staticmethod
- def get_by_id(db: Session, id: int) -> Optional[EmailAuthorization]:
- obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
- if not obj:
- raise NotFoundError("Email authorization not found")
- return obj
-
- @staticmethod
- def get_by_email(db: Session, email: str) -> Optional[EmailAuthorization]:
- obj = db.query(EmailAuthorization).filter(EmailAuthorization.email == email).first()
- if not obj:
- raise NotFoundError("Email authorization not found")
- return obj
-
- @staticmethod
- def create(db: Session, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
- existing = EmailAuthorizationService.get_by_email(db, obj_in.email)
- if existing:
- raise BizLogicError(f"Email {obj_in.email} already exist")
- db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
- db.add(db_obj)
- db.commit()
- db.refresh(db_obj)
- return db_obj
- @staticmethod
- def update(db: Session, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
- db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
- if not db_obj:
- raise NotFoundError("Email authorization not found")
- for field, value in obj_in.dict(exclude_unset=True).items():
- setattr(db_obj, field, value)
- db.add(db_obj)
- db.commit()
- db.refresh(db_obj)
- return db_obj
- @staticmethod
- def delete(db: Session, id: int) -> Optional[EmailAuthorization]:
- db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
- if not db_obj:
- raise NotFoundError("Email authorization not found")
- db.delete(db_obj)
- db.commit()
- return db_obj
-
- @staticmethod
- def _connect_imap_with_proxy(
- host: str,
- port: int,
- proxy_host: Optional[str] = None,
- proxy_port: Optional[int] = None,
- proxy_user: Optional[str] = None,
- proxy_password: Optional[str] = None,
- ) -> imaplib.IMAP4_SSL:
- """
- 创建支持 SOCKS5 / HTTP 代理的 IMAP SSL 连接
- """
- if proxy_host and proxy_port and proxy_port > 0:
- original_socket = socket.socket
- socks.setdefaultproxy(
- proxy_type=socks.SOCKS5, # 可改为 socks.HTTP
- addr=proxy_host,
- port=proxy_port,
- username=proxy_user or None,
- password=proxy_password or None,
- )
- socket.socket = socks.socksocket
- try:
- imap = imaplib.IMAP4_SSL(host, port)
- finally:
- socket.socket = original_socket # 恢复原始 socket
- else:
- imap = imaplib.IMAP4_SSL(host, port)
- return imap
-
-
- @staticmethod
- def _connect_smtp_with_proxy(
- host: str,
- port: int,
- proxy_host: Optional[str] = None,
- proxy_port: Optional[int] = None,
- proxy_user: Optional[str] = None,
- proxy_password: Optional[str] = None,
- ) -> smtplib.SMTP_SSL:
- """
- 创建支持 SOCKS5 / HTTP 代理的 SMTP SSL 连接
- (逻辑完全与 _connect_imap_with_proxy 保持一致,且强制 SSL)
- """
- if proxy_host and proxy_port and proxy_port > 0:
- original_socket = socket.socket
- socks.setdefaultproxy(
- proxy_type=socks.SOCKS5, # 如需改 HTTP,这里改
- addr=proxy_host,
- port=proxy_port,
- username=proxy_user or None,
- password=proxy_password or None,
- )
- socket.socket = socks.socksocket
- try:
- smtp = smtplib.SMTP_SSL(host, port)
- finally:
- socket.socket = original_socket # 恢复原始 socket
- else:
- # 无代理
- smtp = smtplib.SMTP_SSL(host, port)
- return smtp
- @staticmethod
- def fetch_email_authorizations(
- auth,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- sent_date: str,
- expiry: int = 300,
- only_text: bool = True
- ) -> Optional[str]:
- """
- 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
- """
- EMAIL_ACCOUNT = auth.email
- EMAIL_PASSWORD = auth.authorization_code
- IMAP_SERVER = auth.imap_server
- IMAP_PORT = auth.imap_port
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
- # === 时间计算 ===
- sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
- max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
- expiry_at = time.time() + max_wait_time
- def get_received_time(msg):
- """
- 使用最后一条 Received 头解析收件时间
- """
- received_headers = msg.get_all("Received", [])
- if not received_headers:
- return None
- for i, header in enumerate(received_headers, 1):
- logger.debug(f" [{i}] {header}")
- last_received = received_headers[-1]
- if ";" not in last_received:
- return None
- time_str = last_received.split(";")[-1].strip()
- dt_tuple = email.utils.parsedate_tz(time_str)
- if not dt_tuple:
- return None
- return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- IMAP_SERVER,
- IMAP_PORT,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
- mail.select("INBOX")
- while time.time() < expiry_at:
- mail.noop() # 刷新邮箱状态
- _, data = mail.search(None, "ALL")
- mail_ids = data[0].split()
- if not mail_ids:
- time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
- continue
- recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
- messages = []
- debug = True
- for email_id in reversed(recent_ids):
- res, msg_data = mail.fetch(email_id, "(RFC822)")
- if res != "OK" or not msg_data:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
- continue
- msg_bytes = None
- for part in msg_data:
- if isinstance(part, tuple):
- msg_bytes = part[1]
- if not msg_bytes:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
- continue
- msg = email.message_from_bytes(msg_bytes)
- received_dt = get_received_time(msg)
- if not received_dt:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
- continue
- messages.append((msg, received_dt))
- if debug:
- logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
- logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
- # 按收件时间降序排序
- messages.sort(key=lambda x: x[1], reverse=True)
- for msg, received_dt in messages:
- # 判断是否在发送时间后的有效窗口内
- if received_dt < sent_dt:
- if debug:
- logger.debug(f"[INFO] 邮件太旧: {received_dt}")
- continue
- if received_dt > sent_dt + timedelta(seconds=expiry):
- if debug:
- logger.debug(f"[INFO] 邮件太新: {received_dt}")
- continue
- # 匹配发件人/收件人
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
- if sender.lower() not in msg_from.lower():
- if debug:
- logger.debug("发件人不匹配")
- continue
- if recipient.lower() not in msg_to.lower():
- if debug:
- logger.debug("收件人不匹配")
- continue
- # 匹配主题
- subject, enc = decode_header(msg.get("Subject"))[0]
- if isinstance(subject, bytes):
- subject = subject.decode(enc or "utf-8", errors="ignore")
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
- continue
- # 提取正文
- body = EmailAuthorizationService._extract_body(msg, only_text)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys):
- continue
- # 找到匹配邮件 → 返回内容
- mail.close()
- mail.logout()
- return body.strip()
- # 未匹配到 → 等待重试
- time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
- mail.close()
- mail.logout()
- raise NotFoundError("Get email timeout")
-
- @staticmethod
- def fetch_email_authorizations_from_top_n(
- auth,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str,
- top: int = 10,
- only_text: bool = True
- ) -> Optional[str]:
- """
- 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
- """
- EMAIL_ACCOUNT = auth.email
- EMAIL_PASSWORD = auth.authorization_code
- IMAP_SERVER = auth.imap_server
- IMAP_PORT = auth.imap_port
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
- try:
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- IMAP_SERVER,
- IMAP_PORT,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
- mail.select("INBOX")
- _, data = mail.search(None, "ALL")
- mail_ids = data[0].split()
- if not mail_ids:
- return None
- recent_ids = mail_ids[-top:]
- messages = []
- debug = True
- for email_id in reversed(recent_ids):
- res, msg_data = mail.fetch(email_id, "(RFC822)")
- if res != "OK" or not msg_data:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
- continue
- msg_bytes = None
- for part in msg_data:
- if isinstance(part, tuple):
- msg_bytes = part[1]
- if not msg_bytes:
- if debug:
- logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
- continue
- msg = email.message_from_bytes(msg_bytes)
- messages.append(msg)
- if debug:
- logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
- for msg in messages:
-
- # 匹配发件人/收件人
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
- if sender.lower() not in msg_from.lower():
- if debug:
- logger.debug("发件人不匹配")
- continue
- if recipient.lower() not in msg_to.lower():
- if debug:
- logger.debug("收件人不匹配")
- continue
- # 匹配主题
- subject, enc = decode_header(msg.get("Subject"))[0]
- if isinstance(subject, bytes):
- subject = subject.decode(enc or "utf-8", errors="ignore")
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
- continue
- # 提取正文
- body = EmailAuthorizationService._extract_body(msg, only_text)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys):
- continue
- # 找到匹配邮件 → 返回内容
- mail.close()
- mail.logout()
- return body.strip()
- mail.close()
- mail.logout()
- return None # 超时未找到
- except Exception as e:
- raise e
-
- # ----------------------------------------------------------------------
- # 主方法:模仿 Java forwardFirstMatchingEmail
- # ----------------------------------------------------------------------
- def forward_first_matching_email(
- auth,
- forward_to: str,
- sender: str,
- recipient: str,
- subject_keywords: str,
- body_keywords: str
- ):
- EMAIL_ACCOUNT = auth.email
- EMAIL_PASSWORD = auth.authorization_code
- IMAP_SERVER = auth.imap_server
- IMAP_PORT = auth.imap_port
- subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
- body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
- try:
- mail = EmailAuthorizationService._connect_imap_with_proxy(
- IMAP_SERVER,
- IMAP_PORT,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
- mail.select("INBOX")
- target = recipient
- query = f'(HEADER To "{target}")'
- res, data = mail.uid("search", None, query)
- if res != "OK":
- return None
- uids = data[0].split()
- # 获取邮件,按 sentDate 排序
- msgs = []
- for uid in uids:
- res, msg_data = mail.uid("fetch", uid, "(RFC822)")
- if res != "OK":
- continue
- msg = email.message_from_bytes(msg_data[0][1])
- date_ = email.utils.parsedate_to_datetime(msg.get("Date"))
- msgs.append((date_, msg))
- msgs.sort(key=lambda x: x[0], reverse=True)
- # -----------------------------------------------------------
- # 搜索匹配邮件
- # -----------------------------------------------------------
- for _, msg in msgs:
- # 匹配发件人/收件人
- msg_from = msg.get("From", "")
- msg_to = msg.get("To", "")
- if sender.lower() not in msg_from.lower():
- if debug:
- logger.debug("发件人不匹配")
- continue
- if recipient.lower() not in msg_to.lower():
- if debug:
- logger.debug("收件人不匹配")
- continue
- # 匹配主题
- subject, enc = decode_header(msg.get("Subject"))[0]
- if isinstance(subject, bytes):
- subject = subject.decode(enc or "utf-8", errors="ignore")
- if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
- continue
- # 提取正文
- body = EmailAuthorizationService._extract_body(msg, True)
- if body_keys and not any(k.lower() in body.lower() for k in body_keys):
- continue
-
- msg.replace_header("From", EMAIL_ACCOUNT)
- msg.replace_header("To", forward_to)
- # 可选:修改 Subject
- subject_raw = msg.get("Subject", "")
- msg.replace_header("Subject", f"FWD: {subject_raw}")
- # 发送
- EmailAuthorizationService.send_email_smtp(
- auth,
- msg
- )
- return f"邮件 '{subject}' 已成功转发至: {forward_to}"
- return None
- except Exception as e:
- raise e
-
- # ----------------------------------------------------------------------
- # 主方法:模仿 Java send_email
- # ----------------------------------------------------------------------
- def send_email(
- auth,
- send_to: str,
- subject: str,
- content_type: str,
- content: str
- ):
- try:
- EMAIL_ACCOUNT = auth.email
- msg = EmailMessage()
- msg["From"] = EMAIL_ACCOUNT
- msg["To"] = send_to
- msg["Subject"] = subject
-
- html_content = None;
- text_content = None;
- if ("html" == content_type.lower()):
- html_content = content
- else:
- text_content = content
- # 设置正文内容
- if html_content and text_content:
- # multipart/alternative
- msg.set_content(text_content)
- msg.add_alternative(html_content, subtype="html")
- elif html_content:
- msg.add_alternative(html_content, subtype="html")
- elif text_content:
- msg.set_content(text_content)
- else:
- msg.set_content("") # 空邮件
-
- # 发送
- EmailAuthorizationService.send_email_smtp(
- auth,
- msg
- )
- return f"邮件 '{subject}' 成功发送至: {send_to}"
- except Exception as e:
- raise e
-
-
- # ----------------------------------------------------------------------
- # 主方法:模仿 Java send_email
- # ----------------------------------------------------------------------
- def send_email_bulk(
- auth,
- send_to: str,
- subject: str,
- content_type: str,
- content: str
- ):
- bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
- EMAIL_ACCOUNT = auth.email
- msg = EmailMessage()
- msg["From"] = EMAIL_ACCOUNT
- # TO 可以留空或放一个默认收件人
- msg["To"] = bcc_list[0] # 或者固定一个自己邮箱作为 TO
- # BCC 添加所有收件人
- bcc_list = bcc_list
- msg["Subject"] = subject
-
- html_content = None;
- text_content = None;
- if ("html" == content_type.lower()):
- html_content = content
- else:
- text_content = content
- # 设置正文内容
- if html_content and text_content:
- # multipart/alternative
- msg.set_content(text_content)
- msg.add_alternative(html_content, subtype="html")
- elif html_content:
- msg.add_alternative(html_content, subtype="html")
- elif text_content:
- msg.set_content(text_content)
- else:
- msg.set_content("") # 空邮件
-
- # 发送
- EmailAuthorizationService.send_email_smtp(
- auth,
- msg,
- bcc_list = bcc_list
- )
- return f"邮件 '{subject}' 成功发送至: {send_to}"
- # ----------------------------------------------------------------------
- # SMTP 发送邮件
- # ----------------------------------------------------------------------
- @staticmethod
- def send_email_smtp(auth, msg, bcc_list=[]):
- EMAIL_ACCOUNT = auth.email
- EMAIL_PASSWORD = auth.authorization_code
- SMTP_SERVER = auth.smtp_server
- SMTP_PORT = auth.smtp_port
- mail = EmailAuthorizationService._connect_smtp_with_proxy(
- SMTP_SERVER,
- SMTP_PORT,
- auth.proxy_host,
- auth.proxy_port,
- auth.proxy_username,
- auth.proxy_password,
- )
- mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
- if bcc_list:
- mail.send_message(msg, to_addrs=bcc_list)
- else:
- mail.send_message(msg)
- # ==============================================================
- # 辅助函数:提取邮件正文
- # ==============================================================
- @staticmethod
- def _extract_body(msg, only_text: bool = True) -> str:
- """根据 only_text 参数提取邮件内容"""
- body_parts = []
- if msg.is_multipart():
- for part in msg.walk():
- ctype = part.get_content_type()
- if only_text and ctype != "text/plain":
- continue
- if not only_text and ctype not in ["text/plain", "text/html"]:
- continue
- if part.get("Content-Disposition"):
- continue
- charset = part.get_content_charset() or "utf-8"
- try:
- text = part.get_payload(decode=True).decode(charset, errors="ignore")
- body_parts.append(text)
- except Exception:
- continue
- else:
- charset = msg.get_content_charset() or "utf-8"
- body_parts.append(msg.get_payload(decode=True).decode(charset, errors="ignore"))
- body = "\n".join(body_parts)
- return re.sub(r"\s+", " ", body.strip())
|