email_authorizations_service.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. import re
  2. import time
  3. import socket
  4. import smtplib
  5. import socks
  6. import imaplib
  7. import email
  8. from datetime import datetime, timedelta, timezone
  9. from email.message import EmailMessage
  10. from email.header import decode_header
  11. from sqlalchemy.orm import Session
  12. from typing import List, Optional
  13. from app.models.email_authorizations import EmailAuthorization
  14. from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
  15. class EmailAuthorizationService:
  16. DEFAULT_READ_TOP_N_EMAIL = 10 # 最多读取最近多少封邮件
  17. RETRY_DELAY_SECONDS = 5 # 每次轮询间隔
  18. @staticmethod
  19. def get_all(db: Session) -> List[EmailAuthorization]:
  20. return db.query(EmailAuthorization).order_by(EmailAuthorization.id.desc()).all()
  21. @staticmethod
  22. def get_by_id(db: Session, id: int) -> Optional[EmailAuthorization]:
  23. return db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  24. @staticmethod
  25. def get_by_email(db: Session, email: str) -> Optional[EmailAuthorization]:
  26. return db.query(EmailAuthorization).filter(EmailAuthorization.email == email).first()
  27. @staticmethod
  28. def create(db: Session, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
  29. db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
  30. db.add(db_obj)
  31. db.commit()
  32. db.refresh(db_obj)
  33. return db_obj
  34. @staticmethod
  35. def update(db: Session, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
  36. db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  37. if not db_obj:
  38. return None
  39. for field, value in obj_in.dict(exclude_unset=True).items():
  40. setattr(db_obj, field, value)
  41. db.add(db_obj)
  42. db.commit()
  43. db.refresh(db_obj)
  44. return db_obj
  45. @staticmethod
  46. def delete(db: Session, id: int) -> Optional[EmailAuthorization]:
  47. db_obj = db.query(EmailAuthorization).filter(EmailAuthorization.id == id).first()
  48. if not db_obj:
  49. return None
  50. db.delete(db_obj)
  51. db.commit()
  52. return db_obj
  53. @staticmethod
  54. def _connect_imap_with_proxy(
  55. host: str,
  56. port: int,
  57. proxy_host: Optional[str] = None,
  58. proxy_port: Optional[int] = None,
  59. proxy_user: Optional[str] = None,
  60. proxy_password: Optional[str] = None,
  61. ) -> imaplib.IMAP4_SSL:
  62. """
  63. 创建支持 SOCKS5 / HTTP 代理的 IMAP SSL 连接
  64. """
  65. if proxy_host and proxy_port and proxy_port > 0:
  66. original_socket = socket.socket
  67. socks.setdefaultproxy(
  68. proxy_type=socks.SOCKS5, # 可改为 socks.HTTP
  69. addr=proxy_host,
  70. port=proxy_port,
  71. username=proxy_user or None,
  72. password=proxy_password or None,
  73. )
  74. socket.socket = socks.socksocket
  75. try:
  76. imap = imaplib.IMAP4_SSL(host, port)
  77. finally:
  78. socket.socket = original_socket # 恢复原始 socket
  79. else:
  80. imap = imaplib.IMAP4_SSL(host, port)
  81. return imap
  82. @staticmethod
  83. def _connect_smtp_with_proxy(
  84. host: str,
  85. port: int,
  86. proxy_host: Optional[str] = None,
  87. proxy_port: Optional[int] = None,
  88. proxy_user: Optional[str] = None,
  89. proxy_password: Optional[str] = None,
  90. ) -> smtplib.SMTP_SSL:
  91. """
  92. 创建支持 SOCKS5 / HTTP 代理的 SMTP SSL 连接
  93. (逻辑完全与 _connect_imap_with_proxy 保持一致,且强制 SSL)
  94. """
  95. if proxy_host and proxy_port and proxy_port > 0:
  96. original_socket = socket.socket
  97. socks.setdefaultproxy(
  98. proxy_type=socks.SOCKS5, # 如需改 HTTP,这里改
  99. addr=proxy_host,
  100. port=proxy_port,
  101. username=proxy_user or None,
  102. password=proxy_password or None,
  103. )
  104. socket.socket = socks.socksocket
  105. try:
  106. smtp = smtplib.SMTP_SSL(host, port)
  107. finally:
  108. socket.socket = original_socket # 恢复原始 socket
  109. else:
  110. # 无代理
  111. smtp = smtplib.SMTP_SSL(host, port)
  112. return smtp
  113. @staticmethod
  114. def fetch_email_authorizations(
  115. auth,
  116. sender: str,
  117. recipient: str,
  118. subject_keywords: str,
  119. body_keywords: str,
  120. sent_date: str,
  121. expiry: int = 300,
  122. only_text: bool = True
  123. ) -> Optional[str]:
  124. """
  125. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  126. """
  127. EMAIL_ACCOUNT = auth.email
  128. EMAIL_PASSWORD = auth.authorization_code
  129. IMAP_SERVER = auth.imap_server
  130. IMAP_PORT = auth.imap_port
  131. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  132. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  133. # === 时间计算 ===
  134. sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
  135. max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
  136. expiry_at = time.time() + max_wait_time
  137. def get_received_time(msg):
  138. """
  139. 使用最后一条 Received 头解析收件时间
  140. """
  141. received_headers = msg.get_all("Received", [])
  142. if not received_headers:
  143. return None
  144. for i, header in enumerate(received_headers, 1):
  145. print(f" [{i}] {header}")
  146. last_received = received_headers[-1]
  147. if ";" not in last_received:
  148. return None
  149. time_str = last_received.split(";")[-1].strip()
  150. dt_tuple = email.utils.parsedate_tz(time_str)
  151. if not dt_tuple:
  152. return None
  153. return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  154. try:
  155. mail = EmailAuthorizationService._connect_imap_with_proxy(
  156. IMAP_SERVER,
  157. IMAP_PORT,
  158. auth.proxy_host,
  159. auth.proxy_port,
  160. auth.proxy_username,
  161. auth.proxy_password,
  162. )
  163. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  164. mail.select("INBOX")
  165. while time.time() < expiry_at:
  166. mail.noop() # 刷新邮箱状态
  167. _, data = mail.search(None, "ALL")
  168. mail_ids = data[0].split()
  169. if not mail_ids:
  170. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  171. continue
  172. recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
  173. messages = []
  174. debug = True
  175. for email_id in reversed(recent_ids):
  176. res, msg_data = mail.fetch(email_id, "(RFC822)")
  177. if res != "OK" or not msg_data:
  178. if debug:
  179. print(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  180. continue
  181. msg_bytes = None
  182. for part in msg_data:
  183. if isinstance(part, tuple):
  184. msg_bytes = part[1]
  185. if not msg_bytes:
  186. if debug:
  187. print(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  188. continue
  189. msg = email.message_from_bytes(msg_bytes)
  190. received_dt = get_received_time(msg)
  191. if not received_dt:
  192. if debug:
  193. print(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
  194. continue
  195. messages.append((msg, received_dt))
  196. if debug:
  197. print(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  198. print(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
  199. # 按收件时间降序排序
  200. messages.sort(key=lambda x: x[1], reverse=True)
  201. for msg, received_dt in messages:
  202. # 判断是否在发送时间后的有效窗口内
  203. if received_dt < sent_dt:
  204. if debug:
  205. print(f"[INFO] 邮件太旧: {received_dt}")
  206. continue
  207. if received_dt > sent_dt + timedelta(seconds=expiry):
  208. if debug:
  209. print(f"[INFO] 邮件太新: {received_dt}")
  210. continue
  211. # 匹配发件人/收件人
  212. msg_from = msg.get("From", "")
  213. msg_to = msg.get("To", "")
  214. if sender.lower() not in msg_from.lower():
  215. if debug:
  216. print("发件人不匹配")
  217. continue
  218. if recipient.lower() not in msg_to.lower():
  219. if debug:
  220. print("收件人不匹配")
  221. continue
  222. # 匹配主题
  223. subject, enc = decode_header(msg.get("Subject"))[0]
  224. if isinstance(subject, bytes):
  225. subject = subject.decode(enc or "utf-8", errors="ignore")
  226. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  227. continue
  228. # 提取正文
  229. body = EmailAuthorizationService._extract_body(msg, only_text)
  230. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  231. continue
  232. # 找到匹配邮件 → 返回内容
  233. mail.close()
  234. mail.logout()
  235. return body.strip()
  236. # 未匹配到 → 等待重试
  237. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  238. mail.close()
  239. mail.logout()
  240. return None # 超时未找到
  241. except Exception as e:
  242. raise e
  243. @staticmethod
  244. def fetch_email_authorizations_from_top_n(
  245. auth,
  246. sender: str,
  247. recipient: str,
  248. subject_keywords: str,
  249. body_keywords: str,
  250. top: int = 10,
  251. only_text: bool = True
  252. ) -> Optional[str]:
  253. """
  254. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  255. """
  256. EMAIL_ACCOUNT = auth.email
  257. EMAIL_PASSWORD = auth.authorization_code
  258. IMAP_SERVER = auth.imap_server
  259. IMAP_PORT = auth.imap_port
  260. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  261. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  262. try:
  263. mail = EmailAuthorizationService._connect_imap_with_proxy(
  264. IMAP_SERVER,
  265. IMAP_PORT,
  266. auth.proxy_host,
  267. auth.proxy_port,
  268. auth.proxy_username,
  269. auth.proxy_password,
  270. )
  271. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  272. mail.select("INBOX")
  273. _, data = mail.search(None, "ALL")
  274. mail_ids = data[0].split()
  275. if not mail_ids:
  276. return None
  277. recent_ids = mail_ids[-top:]
  278. messages = []
  279. debug = True
  280. for email_id in reversed(recent_ids):
  281. res, msg_data = mail.fetch(email_id, "(RFC822)")
  282. if res != "OK" or not msg_data:
  283. if debug:
  284. print(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  285. continue
  286. msg_bytes = None
  287. for part in msg_data:
  288. if isinstance(part, tuple):
  289. msg_bytes = part[1]
  290. if not msg_bytes:
  291. if debug:
  292. print(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  293. continue
  294. msg = email.message_from_bytes(msg_bytes)
  295. messages.append(msg)
  296. if debug:
  297. print(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  298. for msg in messages:
  299. # 匹配发件人/收件人
  300. msg_from = msg.get("From", "")
  301. msg_to = msg.get("To", "")
  302. if sender.lower() not in msg_from.lower():
  303. if debug:
  304. print("发件人不匹配")
  305. continue
  306. if recipient.lower() not in msg_to.lower():
  307. if debug:
  308. print("收件人不匹配")
  309. continue
  310. # 匹配主题
  311. subject, enc = decode_header(msg.get("Subject"))[0]
  312. if isinstance(subject, bytes):
  313. subject = subject.decode(enc or "utf-8", errors="ignore")
  314. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  315. continue
  316. # 提取正文
  317. body = EmailAuthorizationService._extract_body(msg, only_text)
  318. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  319. continue
  320. # 找到匹配邮件 → 返回内容
  321. mail.close()
  322. mail.logout()
  323. return body.strip()
  324. mail.close()
  325. mail.logout()
  326. return None # 超时未找到
  327. except Exception as e:
  328. raise e
  329. # ----------------------------------------------------------------------
  330. # 主方法:模仿 Java forwardFirstMatchingEmail
  331. # ----------------------------------------------------------------------
  332. def forward_first_matching_email(
  333. auth,
  334. forward_to: str,
  335. sender: str,
  336. recipient: str,
  337. subject_keywords: str,
  338. body_keywords: str
  339. ):
  340. EMAIL_ACCOUNT = auth.email
  341. EMAIL_PASSWORD = auth.authorization_code
  342. IMAP_SERVER = auth.imap_server
  343. IMAP_PORT = auth.imap_port
  344. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  345. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  346. try:
  347. mail = EmailAuthorizationService._connect_imap_with_proxy(
  348. IMAP_SERVER,
  349. IMAP_PORT,
  350. auth.proxy_host,
  351. auth.proxy_port,
  352. auth.proxy_username,
  353. auth.proxy_password,
  354. )
  355. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  356. mail.select("INBOX")
  357. target = recipient
  358. query = f'(HEADER To "{target}")'
  359. res, data = mail.uid("search", None, query)
  360. if res != "OK":
  361. return None
  362. uids = data[0].split()
  363. # 获取邮件,按 sentDate 排序
  364. msgs = []
  365. for uid in uids:
  366. res, msg_data = mail.uid("fetch", uid, "(RFC822)")
  367. if res != "OK":
  368. continue
  369. msg = email.message_from_bytes(msg_data[0][1])
  370. date_ = email.utils.parsedate_to_datetime(msg.get("Date"))
  371. msgs.append((date_, msg))
  372. msgs.sort(key=lambda x: x[0], reverse=True)
  373. # -----------------------------------------------------------
  374. # 搜索匹配邮件
  375. # -----------------------------------------------------------
  376. for _, msg in msgs:
  377. # 匹配发件人/收件人
  378. msg_from = msg.get("From", "")
  379. msg_to = msg.get("To", "")
  380. if sender.lower() not in msg_from.lower():
  381. if debug:
  382. print("发件人不匹配")
  383. continue
  384. if recipient.lower() not in msg_to.lower():
  385. if debug:
  386. print("收件人不匹配")
  387. continue
  388. # 匹配主题
  389. subject, enc = decode_header(msg.get("Subject"))[0]
  390. if isinstance(subject, bytes):
  391. subject = subject.decode(enc or "utf-8", errors="ignore")
  392. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  393. continue
  394. # 提取正文
  395. body = EmailAuthorizationService._extract_body(msg, True)
  396. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  397. continue
  398. msg.replace_header("From", EMAIL_ACCOUNT)
  399. msg.replace_header("To", forward_to)
  400. # 可选:修改 Subject
  401. subject_raw = msg.get("Subject", "")
  402. msg.replace_header("Subject", f"FWD: {subject_raw}")
  403. # 发送
  404. EmailAuthorizationService.send_email_smtp(
  405. auth,
  406. msg
  407. )
  408. return f"邮件 '{subject}' 已成功转发至: {forward_to}"
  409. return None
  410. except Exception as e:
  411. raise e
  412. # ----------------------------------------------------------------------
  413. # 主方法:模仿 Java send_email
  414. # ----------------------------------------------------------------------
  415. def send_email(
  416. auth,
  417. send_to: str,
  418. subject: str,
  419. content_type: str,
  420. content: str
  421. ):
  422. try:
  423. EMAIL_ACCOUNT = auth.email
  424. msg = EmailMessage()
  425. msg["From"] = EMAIL_ACCOUNT
  426. msg["To"] = send_to
  427. msg["Subject"] = subject
  428. html_content = None;
  429. text_content = None;
  430. if ("html" == content_type.lower()):
  431. html_content = content
  432. else:
  433. text_content = content
  434. # 设置正文内容
  435. if html_content and text_content:
  436. # multipart/alternative
  437. msg.set_content(text_content)
  438. msg.add_alternative(html_content, subtype="html")
  439. elif html_content:
  440. msg.add_alternative(html_content, subtype="html")
  441. elif text_content:
  442. msg.set_content(text_content)
  443. else:
  444. msg.set_content("") # 空邮件
  445. # 发送
  446. EmailAuthorizationService.send_email_smtp(
  447. auth,
  448. msg
  449. )
  450. return f"邮件 '{subject}' 成功发送至: {send_to}"
  451. except Exception as e:
  452. raise e
  453. # ----------------------------------------------------------------------
  454. # 主方法:模仿 Java send_email
  455. # ----------------------------------------------------------------------
  456. def send_email_bulk(
  457. auth,
  458. send_to: str,
  459. subject: str,
  460. content_type: str,
  461. content: str
  462. ):
  463. try:
  464. bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
  465. EMAIL_ACCOUNT = auth.email
  466. msg = EmailMessage()
  467. msg["From"] = EMAIL_ACCOUNT
  468. # TO 可以留空或放一个默认收件人
  469. msg["To"] = bcc_list[0] # 或者固定一个自己邮箱作为 TO
  470. # BCC 添加所有收件人
  471. bcc_list = bcc_list
  472. msg["Subject"] = subject
  473. html_content = None;
  474. text_content = None;
  475. if ("html" == content_type.lower()):
  476. html_content = content
  477. else:
  478. text_content = content
  479. # 设置正文内容
  480. if html_content and text_content:
  481. # multipart/alternative
  482. msg.set_content(text_content)
  483. msg.add_alternative(html_content, subtype="html")
  484. elif html_content:
  485. msg.add_alternative(html_content, subtype="html")
  486. elif text_content:
  487. msg.set_content(text_content)
  488. else:
  489. msg.set_content("") # 空邮件
  490. # 发送
  491. EmailAuthorizationService.send_email_smtp(
  492. auth,
  493. msg,
  494. bcc_list = bcc_list
  495. )
  496. return f"邮件 '{subject}' 成功发送至: {send_to}"
  497. except Exception as e:
  498. raise e
  499. # ----------------------------------------------------------------------
  500. # SMTP 发送邮件
  501. # ----------------------------------------------------------------------
  502. @staticmethod
  503. def send_email_smtp(auth, msg, bcc_list=[]):
  504. EMAIL_ACCOUNT = auth.email
  505. EMAIL_PASSWORD = auth.authorization_code
  506. SMTP_SERVER = auth.smtp_server
  507. SMTP_PORT = auth.smtp_port
  508. mail = EmailAuthorizationService._connect_smtp_with_proxy(
  509. SMTP_SERVER,
  510. SMTP_PORT,
  511. auth.proxy_host,
  512. auth.proxy_port,
  513. auth.proxy_username,
  514. auth.proxy_password,
  515. )
  516. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  517. if bcc_list:
  518. mail.send_message(msg, to_addrs=bcc_list)
  519. else:
  520. mail.send_message(msg)
  521. # ==============================================================
  522. # 辅助函数:提取邮件正文
  523. # ==============================================================
  524. @staticmethod
  525. def _extract_body(msg, only_text: bool = True) -> str:
  526. """根据 only_text 参数提取邮件内容"""
  527. body_parts = []
  528. if msg.is_multipart():
  529. for part in msg.walk():
  530. ctype = part.get_content_type()
  531. if only_text and ctype != "text/plain":
  532. continue
  533. if not only_text and ctype not in ["text/plain", "text/html"]:
  534. continue
  535. if part.get("Content-Disposition"):
  536. continue
  537. charset = part.get_content_charset() or "utf-8"
  538. try:
  539. text = part.get_payload(decode=True).decode(charset, errors="ignore")
  540. body_parts.append(text)
  541. except Exception:
  542. continue
  543. else:
  544. charset = msg.get_content_charset() or "utf-8"
  545. body_parts.append(msg.get_payload(decode=True).decode(charset, errors="ignore"))
  546. body = "\n".join(body_parts)
  547. return re.sub(r"\s+", " ", body.strip())