email_authorizations_service.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. import re
  2. import time
  3. import socket
  4. import smtplib
  5. import socks
  6. import imaplib
  7. import email
  8. from datetime import datetime, timedelta, timezone
  9. from email.message import EmailMessage
  10. from email.header import decode_header
  11. from sqlalchemy.orm import Session
  12. from typing import List, Optional
  13. from app.core.logger import logger
  14. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  15. from app.models.email_authorizations import EmailAuthorization
  16. from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
  17. class EmailAuthorizationService:
  18. DEFAULT_READ_TOP_N_EMAIL = 10 # 最多读取最近多少封邮件
  19. RETRY_DELAY_SECONDS = 5 # 每次轮询间隔
  20. @staticmethod
  21. def get_all(db: Session) -> List[EmailAuthorization]:
  22. return db.query(EmailAuthorization).order_by(EmailAuthorization.id.desc()).all()
  23. @staticmethod
  24. def get_by_id(db: Session, id: int) -> Optional[EmailAuthorization]:
  25. obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  26. if not obj:
  27. raise NotFoundError("Email authorization not found")
  28. return obj
  29. @staticmethod
  30. def get_by_email(db: Session, email: str) -> Optional[EmailAuthorization]:
  31. obj = db.query(EmailAuthorization).filter(EmailAuthorization.email == email).first()
  32. if not obj:
  33. raise NotFoundError("Email authorization not found")
  34. return obj
  35. @staticmethod
  36. def create(db: Session, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
  37. existing = EmailAuthorizationService.get_by_email(db, obj_in.email)
  38. if existing:
  39. raise BizLogicError(f"Email {obj_in.email} already exist")
  40. db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
  41. db.add(db_obj)
  42. db.commit()
  43. db.refresh(db_obj)
  44. return db_obj
  45. @staticmethod
  46. def update(db: Session, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
  47. db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  48. if not db_obj:
  49. raise NotFoundError("Email authorization not found")
  50. for field, value in obj_in.dict(exclude_unset=True).items():
  51. setattr(db_obj, field, value)
  52. db.add(db_obj)
  53. db.commit()
  54. db.refresh(db_obj)
  55. return db_obj
  56. @staticmethod
  57. def delete(db: Session, id: int) -> Optional[EmailAuthorization]:
  58. db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  59. if not db_obj:
  60. raise NotFoundError("Email authorization not found")
  61. db.delete(db_obj)
  62. db.commit()
  63. return db_obj
  64. @staticmethod
  65. def _connect_imap_with_proxy(
  66. host: str,
  67. port: int,
  68. proxy_host: Optional[str] = None,
  69. proxy_port: Optional[int] = None,
  70. proxy_user: Optional[str] = None,
  71. proxy_password: Optional[str] = None,
  72. ) -> imaplib.IMAP4_SSL:
  73. """
  74. 创建支持 SOCKS5 / HTTP 代理的 IMAP SSL 连接
  75. """
  76. if proxy_host and proxy_port and proxy_port > 0:
  77. original_socket = socket.socket
  78. socks.setdefaultproxy(
  79. proxy_type=socks.SOCKS5, # 可改为 socks.HTTP
  80. addr=proxy_host,
  81. port=proxy_port,
  82. username=proxy_user or None,
  83. password=proxy_password or None,
  84. )
  85. socket.socket = socks.socksocket
  86. try:
  87. imap = imaplib.IMAP4_SSL(host, port)
  88. finally:
  89. socket.socket = original_socket # 恢复原始 socket
  90. else:
  91. imap = imaplib.IMAP4_SSL(host, port)
  92. return imap
  93. @staticmethod
  94. def _connect_smtp_with_proxy(
  95. host: str,
  96. port: int,
  97. proxy_host: Optional[str] = None,
  98. proxy_port: Optional[int] = None,
  99. proxy_user: Optional[str] = None,
  100. proxy_password: Optional[str] = None,
  101. ) -> smtplib.SMTP_SSL:
  102. """
  103. 创建支持 SOCKS5 / HTTP 代理的 SMTP SSL 连接
  104. (逻辑完全与 _connect_imap_with_proxy 保持一致,且强制 SSL)
  105. """
  106. if proxy_host and proxy_port and proxy_port > 0:
  107. original_socket = socket.socket
  108. socks.setdefaultproxy(
  109. proxy_type=socks.SOCKS5, # 如需改 HTTP,这里改
  110. addr=proxy_host,
  111. port=proxy_port,
  112. username=proxy_user or None,
  113. password=proxy_password or None,
  114. )
  115. socket.socket = socks.socksocket
  116. try:
  117. smtp = smtplib.SMTP_SSL(host, port)
  118. finally:
  119. socket.socket = original_socket # 恢复原始 socket
  120. else:
  121. # 无代理
  122. smtp = smtplib.SMTP_SSL(host, port)
  123. return smtp
  124. @staticmethod
  125. def fetch_email_authorizations(
  126. auth,
  127. sender: str,
  128. recipient: str,
  129. subject_keywords: str,
  130. body_keywords: str,
  131. sent_date: str,
  132. expiry: int = 300,
  133. only_text: bool = True
  134. ) -> Optional[str]:
  135. """
  136. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  137. """
  138. EMAIL_ACCOUNT = auth.email
  139. EMAIL_PASSWORD = auth.authorization_code
  140. IMAP_SERVER = auth.imap_server
  141. IMAP_PORT = auth.imap_port
  142. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  143. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  144. # === 时间计算 ===
  145. sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
  146. max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
  147. expiry_at = time.time() + max_wait_time
  148. def get_received_time(msg):
  149. """
  150. 使用最后一条 Received 头解析收件时间
  151. """
  152. received_headers = msg.get_all("Received", [])
  153. if not received_headers:
  154. return None
  155. for i, header in enumerate(received_headers, 1):
  156. logger.debug(f" [{i}] {header}")
  157. last_received = received_headers[-1]
  158. if ";" not in last_received:
  159. return None
  160. time_str = last_received.split(";")[-1].strip()
  161. dt_tuple = email.utils.parsedate_tz(time_str)
  162. if not dt_tuple:
  163. return None
  164. return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  165. mail = EmailAuthorizationService._connect_imap_with_proxy(
  166. IMAP_SERVER,
  167. IMAP_PORT,
  168. auth.proxy_host,
  169. auth.proxy_port,
  170. auth.proxy_username,
  171. auth.proxy_password,
  172. )
  173. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  174. mail.select("INBOX")
  175. while time.time() < expiry_at:
  176. mail.noop() # 刷新邮箱状态
  177. _, data = mail.search(None, "ALL")
  178. mail_ids = data[0].split()
  179. if not mail_ids:
  180. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  181. continue
  182. recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
  183. messages = []
  184. debug = True
  185. for email_id in reversed(recent_ids):
  186. res, msg_data = mail.fetch(email_id, "(RFC822)")
  187. if res != "OK" or not msg_data:
  188. if debug:
  189. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  190. continue
  191. msg_bytes = None
  192. for part in msg_data:
  193. if isinstance(part, tuple):
  194. msg_bytes = part[1]
  195. if not msg_bytes:
  196. if debug:
  197. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  198. continue
  199. msg = email.message_from_bytes(msg_bytes)
  200. received_dt = get_received_time(msg)
  201. if not received_dt:
  202. if debug:
  203. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
  204. continue
  205. messages.append((msg, received_dt))
  206. if debug:
  207. logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  208. logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
  209. # 按收件时间降序排序
  210. messages.sort(key=lambda x: x[1], reverse=True)
  211. for msg, received_dt in messages:
  212. # 判断是否在发送时间后的有效窗口内
  213. if received_dt < sent_dt:
  214. if debug:
  215. logger.debug(f"[INFO] 邮件太旧: {received_dt}")
  216. continue
  217. if received_dt > sent_dt + timedelta(seconds=expiry):
  218. if debug:
  219. logger.debug(f"[INFO] 邮件太新: {received_dt}")
  220. continue
  221. # 匹配发件人/收件人
  222. msg_from = msg.get("From", "")
  223. msg_to = msg.get("To", "")
  224. if sender.lower() not in msg_from.lower():
  225. if debug:
  226. logger.debug("发件人不匹配")
  227. continue
  228. if recipient.lower() not in msg_to.lower():
  229. if debug:
  230. logger.debug("收件人不匹配")
  231. continue
  232. # 匹配主题
  233. subject, enc = decode_header(msg.get("Subject"))[0]
  234. if isinstance(subject, bytes):
  235. subject = subject.decode(enc or "utf-8", errors="ignore")
  236. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  237. continue
  238. # 提取正文
  239. body = EmailAuthorizationService._extract_body(msg, only_text)
  240. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  241. continue
  242. # 找到匹配邮件 → 返回内容
  243. mail.close()
  244. mail.logout()
  245. return body.strip()
  246. # 未匹配到 → 等待重试
  247. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  248. mail.close()
  249. mail.logout()
  250. raise NotFoundError("Get email timeout")
  251. @staticmethod
  252. def fetch_email_authorizations_from_top_n(
  253. auth,
  254. sender: str,
  255. recipient: str,
  256. subject_keywords: str,
  257. body_keywords: str,
  258. top: int = 10,
  259. only_text: bool = True
  260. ) -> Optional[str]:
  261. """
  262. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  263. """
  264. EMAIL_ACCOUNT = auth.email
  265. EMAIL_PASSWORD = auth.authorization_code
  266. IMAP_SERVER = auth.imap_server
  267. IMAP_PORT = auth.imap_port
  268. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  269. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  270. try:
  271. mail = EmailAuthorizationService._connect_imap_with_proxy(
  272. IMAP_SERVER,
  273. IMAP_PORT,
  274. auth.proxy_host,
  275. auth.proxy_port,
  276. auth.proxy_username,
  277. auth.proxy_password,
  278. )
  279. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  280. mail.select("INBOX")
  281. _, data = mail.search(None, "ALL")
  282. mail_ids = data[0].split()
  283. if not mail_ids:
  284. return None
  285. recent_ids = mail_ids[-top:]
  286. messages = []
  287. debug = True
  288. for email_id in reversed(recent_ids):
  289. res, msg_data = mail.fetch(email_id, "(RFC822)")
  290. if res != "OK" or not msg_data:
  291. if debug:
  292. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  293. continue
  294. msg_bytes = None
  295. for part in msg_data:
  296. if isinstance(part, tuple):
  297. msg_bytes = part[1]
  298. if not msg_bytes:
  299. if debug:
  300. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  301. continue
  302. msg = email.message_from_bytes(msg_bytes)
  303. messages.append(msg)
  304. if debug:
  305. logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  306. for msg in messages:
  307. # 匹配发件人/收件人
  308. msg_from = msg.get("From", "")
  309. msg_to = msg.get("To", "")
  310. if sender.lower() not in msg_from.lower():
  311. if debug:
  312. logger.debug("发件人不匹配")
  313. continue
  314. if recipient.lower() not in msg_to.lower():
  315. if debug:
  316. logger.debug("收件人不匹配")
  317. continue
  318. # 匹配主题
  319. subject, enc = decode_header(msg.get("Subject"))[0]
  320. if isinstance(subject, bytes):
  321. subject = subject.decode(enc or "utf-8", errors="ignore")
  322. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  323. continue
  324. # 提取正文
  325. body = EmailAuthorizationService._extract_body(msg, only_text)
  326. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  327. continue
  328. # 找到匹配邮件 → 返回内容
  329. mail.close()
  330. mail.logout()
  331. return body.strip()
  332. mail.close()
  333. mail.logout()
  334. return None # 超时未找到
  335. except Exception as e:
  336. raise e
  337. # ----------------------------------------------------------------------
  338. # 主方法:模仿 Java forwardFirstMatchingEmail
  339. # ----------------------------------------------------------------------
  340. def forward_first_matching_email(
  341. auth,
  342. forward_to: str,
  343. sender: str,
  344. recipient: str,
  345. subject_keywords: str,
  346. body_keywords: str
  347. ):
  348. EMAIL_ACCOUNT = auth.email
  349. EMAIL_PASSWORD = auth.authorization_code
  350. IMAP_SERVER = auth.imap_server
  351. IMAP_PORT = auth.imap_port
  352. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  353. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  354. try:
  355. mail = EmailAuthorizationService._connect_imap_with_proxy(
  356. IMAP_SERVER,
  357. IMAP_PORT,
  358. auth.proxy_host,
  359. auth.proxy_port,
  360. auth.proxy_username,
  361. auth.proxy_password,
  362. )
  363. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  364. mail.select("INBOX")
  365. target = recipient
  366. query = f'(HEADER To "{target}")'
  367. res, data = mail.uid("search", None, query)
  368. if res != "OK":
  369. return None
  370. uids = data[0].split()
  371. # 获取邮件,按 sentDate 排序
  372. msgs = []
  373. for uid in uids:
  374. res, msg_data = mail.uid("fetch", uid, "(RFC822)")
  375. if res != "OK":
  376. continue
  377. msg = email.message_from_bytes(msg_data[0][1])
  378. date_ = email.utils.parsedate_to_datetime(msg.get("Date"))
  379. msgs.append((date_, msg))
  380. msgs.sort(key=lambda x: x[0], reverse=True)
  381. # -----------------------------------------------------------
  382. # 搜索匹配邮件
  383. # -----------------------------------------------------------
  384. for _, msg in msgs:
  385. # 匹配发件人/收件人
  386. msg_from = msg.get("From", "")
  387. msg_to = msg.get("To", "")
  388. if sender.lower() not in msg_from.lower():
  389. if debug:
  390. logger.debug("发件人不匹配")
  391. continue
  392. if recipient.lower() not in msg_to.lower():
  393. if debug:
  394. logger.debug("收件人不匹配")
  395. continue
  396. # 匹配主题
  397. subject, enc = decode_header(msg.get("Subject"))[0]
  398. if isinstance(subject, bytes):
  399. subject = subject.decode(enc or "utf-8", errors="ignore")
  400. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  401. continue
  402. # 提取正文
  403. body = EmailAuthorizationService._extract_body(msg, True)
  404. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  405. continue
  406. msg.replace_header("From", EMAIL_ACCOUNT)
  407. msg.replace_header("To", forward_to)
  408. # 可选:修改 Subject
  409. subject_raw = msg.get("Subject", "")
  410. msg.replace_header("Subject", f"FWD: {subject_raw}")
  411. # 发送
  412. EmailAuthorizationService.send_email_smtp(
  413. auth,
  414. msg
  415. )
  416. return f"邮件 '{subject}' 已成功转发至: {forward_to}"
  417. return None
  418. except Exception as e:
  419. raise e
  420. # ----------------------------------------------------------------------
  421. # 主方法:模仿 Java send_email
  422. # ----------------------------------------------------------------------
  423. def send_email(
  424. auth,
  425. send_to: str,
  426. subject: str,
  427. content_type: str,
  428. content: str
  429. ):
  430. try:
  431. EMAIL_ACCOUNT = auth.email
  432. msg = EmailMessage()
  433. msg["From"] = EMAIL_ACCOUNT
  434. msg["To"] = send_to
  435. msg["Subject"] = subject
  436. html_content = None;
  437. text_content = None;
  438. if ("html" == content_type.lower()):
  439. html_content = content
  440. else:
  441. text_content = content
  442. # 设置正文内容
  443. if html_content and text_content:
  444. # multipart/alternative
  445. msg.set_content(text_content)
  446. msg.add_alternative(html_content, subtype="html")
  447. elif html_content:
  448. msg.add_alternative(html_content, subtype="html")
  449. elif text_content:
  450. msg.set_content(text_content)
  451. else:
  452. msg.set_content("") # 空邮件
  453. # 发送
  454. EmailAuthorizationService.send_email_smtp(
  455. auth,
  456. msg
  457. )
  458. return f"邮件 '{subject}' 成功发送至: {send_to}"
  459. except Exception as e:
  460. raise e
  461. # ----------------------------------------------------------------------
  462. # 主方法:模仿 Java send_email
  463. # ----------------------------------------------------------------------
  464. def send_email_bulk(
  465. auth,
  466. send_to: str,
  467. subject: str,
  468. content_type: str,
  469. content: str
  470. ):
  471. bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
  472. EMAIL_ACCOUNT = auth.email
  473. msg = EmailMessage()
  474. msg["From"] = EMAIL_ACCOUNT
  475. # TO 可以留空或放一个默认收件人
  476. msg["To"] = bcc_list[0] # 或者固定一个自己邮箱作为 TO
  477. # BCC 添加所有收件人
  478. bcc_list = bcc_list
  479. msg["Subject"] = subject
  480. html_content = None;
  481. text_content = None;
  482. if ("html" == content_type.lower()):
  483. html_content = content
  484. else:
  485. text_content = content
  486. # 设置正文内容
  487. if html_content and text_content:
  488. # multipart/alternative
  489. msg.set_content(text_content)
  490. msg.add_alternative(html_content, subtype="html")
  491. elif html_content:
  492. msg.add_alternative(html_content, subtype="html")
  493. elif text_content:
  494. msg.set_content(text_content)
  495. else:
  496. msg.set_content("") # 空邮件
  497. # 发送
  498. EmailAuthorizationService.send_email_smtp(
  499. auth,
  500. msg,
  501. bcc_list = bcc_list
  502. )
  503. return f"邮件 '{subject}' 成功发送至: {send_to}"
  504. # ----------------------------------------------------------------------
  505. # SMTP 发送邮件
  506. # ----------------------------------------------------------------------
  507. @staticmethod
  508. def send_email_smtp(auth, msg, bcc_list=[]):
  509. EMAIL_ACCOUNT = auth.email
  510. EMAIL_PASSWORD = auth.authorization_code
  511. SMTP_SERVER = auth.smtp_server
  512. SMTP_PORT = auth.smtp_port
  513. mail = EmailAuthorizationService._connect_smtp_with_proxy(
  514. SMTP_SERVER,
  515. SMTP_PORT,
  516. auth.proxy_host,
  517. auth.proxy_port,
  518. auth.proxy_username,
  519. auth.proxy_password,
  520. )
  521. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  522. if bcc_list:
  523. mail.send_message(msg, to_addrs=bcc_list)
  524. else:
  525. mail.send_message(msg)
  526. # ==============================================================
  527. # 辅助函数:提取邮件正文
  528. # ==============================================================
  529. @staticmethod
  530. def _extract_body(msg, only_text: bool = True) -> str:
  531. """根据 only_text 参数提取邮件内容"""
  532. body_parts = []
  533. if msg.is_multipart():
  534. for part in msg.walk():
  535. ctype = part.get_content_type()
  536. if only_text and ctype != "text/plain":
  537. continue
  538. if not only_text and ctype not in ["text/plain", "text/html"]:
  539. continue
  540. if part.get("Content-Disposition"):
  541. continue
  542. charset = part.get_content_charset() or "utf-8"
  543. try:
  544. text = part.get_payload(decode=True).decode(charset, errors="ignore")
  545. body_parts.append(text)
  546. except Exception:
  547. continue
  548. else:
  549. charset = msg.get_content_charset() or "utf-8"
  550. body_parts.append(msg.get_payload(decode=True).decode(charset, errors="ignore"))
  551. body = "\n".join(body_parts)
  552. return re.sub(r"\s+", " ", body.strip())