import re import time import socket import smtplib import socks import imaplib import email from datetime import datetime, timedelta, timezone from email.message import EmailMessage from email.header import decode_header from sqlalchemy.orm import Session from typing import List, Optional from app.core.logger import logger from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.email_authorizations import EmailAuthorization from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate class EmailAuthorizationService: DEFAULT_READ_TOP_N_EMAIL = 10 # 最多读取最近多少封邮件 RETRY_DELAY_SECONDS = 5 # 每次轮询间隔 @staticmethod def get_all(db: Session) -> List[EmailAuthorization]: return db.query(EmailAuthorization).order_by(EmailAuthorization.id.desc()).all() @staticmethod def get_by_id(db: Session, id: int) -> Optional[EmailAuthorization]: obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first() if not obj: raise NotFoundError("Email authorization not found") return obj @staticmethod def get_by_email(db: Session, email: str) -> Optional[EmailAuthorization]: obj = db.query(EmailAuthorization).filter(EmailAuthorization.email == email).first() if not obj: raise NotFoundError("Email authorization not found") return obj @staticmethod def create(db: Session, obj_in: EmailAuthorizationCreate) -> EmailAuthorization: existing = EmailAuthorizationService.get_by_email(db, obj_in.email) if existing: raise BizLogicError(f"Email {obj_in.email} already exist") db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True)) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj @staticmethod def update(db: Session, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]: db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first() if not db_obj: raise NotFoundError("Email authorization not found") for field, value in obj_in.dict(exclude_unset=True).items(): setattr(db_obj, field, value) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj @staticmethod def delete(db: Session, id: int) -> Optional[EmailAuthorization]: db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first() if not db_obj: raise NotFoundError("Email authorization not found") db.delete(db_obj) db.commit() return db_obj @staticmethod def _connect_imap_with_proxy( host: str, port: int, proxy_host: Optional[str] = None, proxy_port: Optional[int] = None, proxy_user: Optional[str] = None, proxy_password: Optional[str] = None, ) -> imaplib.IMAP4_SSL: """ 创建支持 SOCKS5 / HTTP 代理的 IMAP SSL 连接 """ if proxy_host and proxy_port and proxy_port > 0: original_socket = socket.socket socks.setdefaultproxy( proxy_type=socks.SOCKS5, # 可改为 socks.HTTP addr=proxy_host, port=proxy_port, username=proxy_user or None, password=proxy_password or None, ) socket.socket = socks.socksocket try: imap = imaplib.IMAP4_SSL(host, port) finally: socket.socket = original_socket # 恢复原始 socket else: imap = imaplib.IMAP4_SSL(host, port) return imap @staticmethod def _connect_smtp_with_proxy( host: str, port: int, proxy_host: Optional[str] = None, proxy_port: Optional[int] = None, proxy_user: Optional[str] = None, proxy_password: Optional[str] = None, ) -> smtplib.SMTP_SSL: """ 创建支持 SOCKS5 / HTTP 代理的 SMTP SSL 连接 (逻辑完全与 _connect_imap_with_proxy 保持一致,且强制 SSL) """ if proxy_host and proxy_port and proxy_port > 0: original_socket = socket.socket socks.setdefaultproxy( proxy_type=socks.SOCKS5, # 如需改 HTTP,这里改 addr=proxy_host, port=proxy_port, username=proxy_user or None, password=proxy_password or None, ) socket.socket = socks.socksocket try: smtp = smtplib.SMTP_SSL(host, port) finally: socket.socket = original_socket # 恢复原始 socket else: # 无代理 smtp = smtplib.SMTP_SSL(host, port) return smtp @staticmethod def fetch_email_authorizations( auth, sender: str, recipient: str, subject_keywords: str, body_keywords: str, sent_date: str, expiry: int = 300, only_text: bool = True ) -> Optional[str]: """ 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间) """ EMAIL_ACCOUNT = auth.email EMAIL_PASSWORD = auth.authorization_code IMAP_SERVER = auth.imap_server IMAP_PORT = auth.imap_port subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()] body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()] # === 时间计算 === sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) max_wait_time = min(5 * 60, expiry) # 最长等待5分钟 expiry_at = time.time() + max_wait_time def get_received_time(msg): """ 使用最后一条 Received 头解析收件时间 """ received_headers = msg.get_all("Received", []) if not received_headers: return None for i, header in enumerate(received_headers, 1): logger.debug(f" [{i}] {header}") last_received = received_headers[-1] if ";" not in last_received: return None time_str = last_received.split(";")[-1].strip() dt_tuple = email.utils.parsedate_tz(time_str) if not dt_tuple: return None return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc) mail = EmailAuthorizationService._connect_imap_with_proxy( IMAP_SERVER, IMAP_PORT, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password, ) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select("INBOX") while time.time() < expiry_at: mail.noop() # 刷新邮箱状态 _, data = mail.search(None, "ALL") mail_ids = data[0].split() if not mail_ids: time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS) continue recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:] messages = [] debug = True for email_id in reversed(recent_ids): res, msg_data = mail.fetch(email_id, "(RFC822)") if res != "OK" or not msg_data: if debug: logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败") continue msg_bytes = None for part in msg_data: if isinstance(part, tuple): msg_bytes = part[1] if not msg_bytes: if debug: logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文") continue msg = email.message_from_bytes(msg_bytes) received_dt = get_received_time(msg) if not received_dt: if debug: logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间") continue messages.append((msg, received_dt)) if debug: logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}") logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}") # 按收件时间降序排序 messages.sort(key=lambda x: x[1], reverse=True) for msg, received_dt in messages: # 判断是否在发送时间后的有效窗口内 if received_dt < sent_dt: if debug: logger.debug(f"[INFO] 邮件太旧: {received_dt}") continue if received_dt > sent_dt + timedelta(seconds=expiry): if debug: logger.debug(f"[INFO] 邮件太新: {received_dt}") continue # 匹配发件人/收件人 msg_from = msg.get("From", "") msg_to = msg.get("To", "") if sender.lower() not in msg_from.lower(): if debug: logger.debug("发件人不匹配") continue if recipient.lower() not in msg_to.lower(): if debug: logger.debug("收件人不匹配") continue # 匹配主题 subject, enc = decode_header(msg.get("Subject"))[0] if isinstance(subject, bytes): subject = subject.decode(enc or "utf-8", errors="ignore") if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue # 提取正文 body = EmailAuthorizationService._extract_body(msg, only_text) if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue # 找到匹配邮件 → 返回内容 mail.close() mail.logout() return body.strip() # 未匹配到 → 等待重试 time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS) mail.close() mail.logout() raise NotFoundError("Get email timeout") @staticmethod def fetch_email_authorizations_from_top_n( auth, sender: str, recipient: str, subject_keywords: str, body_keywords: str, top: int = 10, only_text: bool = True ) -> Optional[str]: """ 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间) """ EMAIL_ACCOUNT = auth.email EMAIL_PASSWORD = auth.authorization_code IMAP_SERVER = auth.imap_server IMAP_PORT = auth.imap_port subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()] body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()] try: mail = EmailAuthorizationService._connect_imap_with_proxy( IMAP_SERVER, IMAP_PORT, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password, ) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select("INBOX") _, data = mail.search(None, "ALL") mail_ids = data[0].split() if not mail_ids: return None recent_ids = mail_ids[-top:] messages = [] debug = True for email_id in reversed(recent_ids): res, msg_data = mail.fetch(email_id, "(RFC822)") if res != "OK" or not msg_data: if debug: logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败") continue msg_bytes = None for part in msg_data: if isinstance(part, tuple): msg_bytes = part[1] if not msg_bytes: if debug: logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文") continue msg = email.message_from_bytes(msg_bytes) messages.append(msg) if debug: logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}") for msg in messages: # 匹配发件人/收件人 msg_from = msg.get("From", "") msg_to = msg.get("To", "") if sender.lower() not in msg_from.lower(): if debug: logger.debug("发件人不匹配") continue if recipient.lower() not in msg_to.lower(): if debug: logger.debug("收件人不匹配") continue # 匹配主题 subject, enc = decode_header(msg.get("Subject"))[0] if isinstance(subject, bytes): subject = subject.decode(enc or "utf-8", errors="ignore") if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue # 提取正文 body = EmailAuthorizationService._extract_body(msg, only_text) if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue # 找到匹配邮件 → 返回内容 mail.close() mail.logout() return body.strip() mail.close() mail.logout() return None # 超时未找到 except Exception as e: raise e # ---------------------------------------------------------------------- # 主方法:模仿 Java forwardFirstMatchingEmail # ---------------------------------------------------------------------- def forward_first_matching_email( auth, forward_to: str, sender: str, recipient: str, subject_keywords: str, body_keywords: str ): EMAIL_ACCOUNT = auth.email EMAIL_PASSWORD = auth.authorization_code IMAP_SERVER = auth.imap_server IMAP_PORT = auth.imap_port subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()] body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()] try: mail = EmailAuthorizationService._connect_imap_with_proxy( IMAP_SERVER, IMAP_PORT, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password, ) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select("INBOX") target = recipient query = f'(HEADER To "{target}")' res, data = mail.uid("search", None, query) if res != "OK": return None uids = data[0].split() # 获取邮件,按 sentDate 排序 msgs = [] for uid in uids: res, msg_data = mail.uid("fetch", uid, "(RFC822)") if res != "OK": continue msg = email.message_from_bytes(msg_data[0][1]) date_ = email.utils.parsedate_to_datetime(msg.get("Date")) msgs.append((date_, msg)) msgs.sort(key=lambda x: x[0], reverse=True) # ----------------------------------------------------------- # 搜索匹配邮件 # ----------------------------------------------------------- for _, msg in msgs: # 匹配发件人/收件人 msg_from = msg.get("From", "") msg_to = msg.get("To", "") if sender.lower() not in msg_from.lower(): if debug: logger.debug("发件人不匹配") continue if recipient.lower() not in msg_to.lower(): if debug: logger.debug("收件人不匹配") continue # 匹配主题 subject, enc = decode_header(msg.get("Subject"))[0] if isinstance(subject, bytes): subject = subject.decode(enc or "utf-8", errors="ignore") if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue # 提取正文 body = EmailAuthorizationService._extract_body(msg, True) if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue msg.replace_header("From", EMAIL_ACCOUNT) msg.replace_header("To", forward_to) # 可选:修改 Subject subject_raw = msg.get("Subject", "") msg.replace_header("Subject", f"FWD: {subject_raw}") # 发送 EmailAuthorizationService.send_email_smtp( auth, msg ) return f"邮件 '{subject}' 已成功转发至: {forward_to}" return None except Exception as e: raise e # ---------------------------------------------------------------------- # 主方法:模仿 Java send_email # ---------------------------------------------------------------------- def send_email( auth, send_to: str, subject: str, content_type: str, content: str ): try: EMAIL_ACCOUNT = auth.email msg = EmailMessage() msg["From"] = EMAIL_ACCOUNT msg["To"] = send_to msg["Subject"] = subject html_content = None; text_content = None; if ("html" == content_type.lower()): html_content = content else: text_content = content # 设置正文内容 if html_content and text_content: # multipart/alternative msg.set_content(text_content) msg.add_alternative(html_content, subtype="html") elif html_content: msg.add_alternative(html_content, subtype="html") elif text_content: msg.set_content(text_content) else: msg.set_content("") # 空邮件 # 发送 EmailAuthorizationService.send_email_smtp( auth, msg ) return f"邮件 '{subject}' 成功发送至: {send_to}" except Exception as e: raise e # ---------------------------------------------------------------------- # 主方法:模仿 Java send_email # ---------------------------------------------------------------------- def send_email_bulk( auth, send_to: str, subject: str, content_type: str, content: str ): bcc_list = [s.strip() for s in send_to.split(",") if s.strip()] EMAIL_ACCOUNT = auth.email msg = EmailMessage() msg["From"] = EMAIL_ACCOUNT # TO 可以留空或放一个默认收件人 msg["To"] = bcc_list[0] # 或者固定一个自己邮箱作为 TO # BCC 添加所有收件人 bcc_list = bcc_list msg["Subject"] = subject html_content = None; text_content = None; if ("html" == content_type.lower()): html_content = content else: text_content = content # 设置正文内容 if html_content and text_content: # multipart/alternative msg.set_content(text_content) msg.add_alternative(html_content, subtype="html") elif html_content: msg.add_alternative(html_content, subtype="html") elif text_content: msg.set_content(text_content) else: msg.set_content("") # 空邮件 # 发送 EmailAuthorizationService.send_email_smtp( auth, msg, bcc_list = bcc_list ) return f"邮件 '{subject}' 成功发送至: {send_to}" # ---------------------------------------------------------------------- # SMTP 发送邮件 # ---------------------------------------------------------------------- @staticmethod def send_email_smtp(auth, msg, bcc_list=[]): EMAIL_ACCOUNT = auth.email EMAIL_PASSWORD = auth.authorization_code SMTP_SERVER = auth.smtp_server SMTP_PORT = auth.smtp_port mail = EmailAuthorizationService._connect_smtp_with_proxy( SMTP_SERVER, SMTP_PORT, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password, ) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) if bcc_list: mail.send_message(msg, to_addrs=bcc_list) else: mail.send_message(msg) # ============================================================== # 辅助函数:提取邮件正文 # ============================================================== @staticmethod def _extract_body(msg, only_text: bool = True) -> str: """根据 only_text 参数提取邮件内容""" body_parts = [] if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() if only_text and ctype != "text/plain": continue if not only_text and ctype not in ["text/plain", "text/html"]: continue if part.get("Content-Disposition"): continue charset = part.get_content_charset() or "utf-8" try: text = part.get_payload(decode=True).decode(charset, errors="ignore") body_parts.append(text) except Exception: continue else: charset = msg.get_content_charset() or "utf-8" body_parts.append(msg.get_payload(decode=True).decode(charset, errors="ignore")) body = "\n".join(body_parts) return re.sub(r"\s+", " ", body.strip())