email_authorizations_service.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. import threading
  2. import socket
  3. import socks
  4. import imaplib
  5. import smtplib
  6. import email
  7. import asyncio
  8. import re
  9. import time
  10. from datetime import datetime, timedelta, timezone
  11. from email.message import EmailMessage
  12. from email.header import decode_header
  13. from typing import List, Optional
  14. from sqlalchemy.orm import Session
  15. from sqlalchemy.ext.asyncio import AsyncSession
  16. from sqlalchemy import select, text
  17. from starlette.concurrency import run_in_threadpool
  18. from app.core.logger import logger
  19. from app.core.biz_exception import NotFoundError, BizLogicError
  20. from app.models.email_authorizations import EmailAuthorization
  21. from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
  22. # 保持锁逻辑不变
  23. _PROXY_LOCK = threading.Lock()
  24. class EmailAuthorizationService:
  25. DEFAULT_READ_TOP_N_EMAIL = 10
  26. RETRY_DELAY_SECONDS = 5
  27. # =================================================================
  28. # 数据库操作 (DB CRUD) - 使用 AsyncSession
  29. # =================================================================
  30. @staticmethod
  31. async def get_all(db: AsyncSession) -> List[EmailAuthorization]:
  32. # AsyncSession 不支持 db.query,需要用 select(Model)
  33. stmt = select(EmailAuthorization).order_by(EmailAuthorization.id.desc())
  34. result = await db.execute(stmt)
  35. return result.scalars().all()
  36. @staticmethod
  37. async def get_by_id(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
  38. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  39. result = await db.execute(stmt)
  40. # scalar_one_or_none 类似于 .first(),但更严格(如果有多个会报错,这里id是主键所以没问题)
  41. obj = result.scalar_one_or_none()
  42. if not obj:
  43. raise NotFoundError("Email authorization not found")
  44. return obj
  45. @staticmethod
  46. async def get_by_email(db: AsyncSession, email: str) -> Optional[EmailAuthorization]:
  47. stmt = select(EmailAuthorization).where(EmailAuthorization.email == email)
  48. result = await db.execute(stmt)
  49. obj = result.scalar_one_or_none()
  50. if not obj:
  51. raise NotFoundError("Email authorization not found")
  52. return obj
  53. @staticmethod
  54. async def create(db: AsyncSession, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
  55. # 先检查是否存在
  56. stmt = select(EmailAuthorization).where(EmailAuthorization.email == obj_in.email)
  57. result = await db.execute(stmt)
  58. if result.scalar_one_or_none():
  59. raise BizLogicError(f"Email {obj_in.email} already exist")
  60. # 创建对象
  61. db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
  62. # db.add 是同步方法(只是添加到 session 上下文)
  63. db.add(db_obj)
  64. # commit 和 refresh 是异步的
  65. await db.commit()
  66. await db.refresh(db_obj)
  67. return db_obj
  68. @staticmethod
  69. async def update(db: AsyncSession, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
  70. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  71. result = await db.execute(stmt)
  72. db_obj = result.scalar_one_or_none()
  73. if not db_obj:
  74. raise NotFoundError("Email authorization not found")
  75. for field, value in obj_in.dict(exclude_unset=True).items():
  76. setattr(db_obj, field, value)
  77. db.add(db_obj)
  78. await db.commit()
  79. await db.refresh(db_obj)
  80. return db_obj
  81. @staticmethod
  82. async def delete(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
  83. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  84. result = await db.execute(stmt)
  85. db_obj = result.scalar_one_or_none()
  86. if not db_obj:
  87. raise NotFoundError("Email authorization not found")
  88. # delete 也是同步标记
  89. await db.delete(db_obj)
  90. await db.commit()
  91. return db_obj
  92. @staticmethod
  93. def _connect_imap_with_proxy(
  94. host: str,
  95. port: int,
  96. proxy_host: Optional[str] = None,
  97. proxy_port: Optional[int] = None,
  98. proxy_user: Optional[str] = None,
  99. proxy_password: Optional[str] = None,
  100. ) -> imaplib.IMAP4_SSL:
  101. """
  102. 创建连接 (同步方法,将在线程中运行)
  103. 使用 Lock 确保 socket patching 不会影响其他并发请求
  104. """
  105. if proxy_host and proxy_port and proxy_port > 0:
  106. with _PROXY_LOCK: # 加锁,防止多线程同时修改全局 socket
  107. original_socket = socket.socket
  108. socks.setdefaultproxy(
  109. proxy_type=socks.SOCKS5,
  110. addr=proxy_host,
  111. port=proxy_port,
  112. username=proxy_user or None,
  113. password=proxy_password or None,
  114. )
  115. socket.socket = socks.socksocket
  116. try:
  117. imap = imaplib.IMAP4_SSL(host, port)
  118. finally:
  119. socket.socket = original_socket
  120. else:
  121. imap = imaplib.IMAP4_SSL(host, port)
  122. return imap
  123. @staticmethod
  124. def _connect_smtp_with_proxy(
  125. host: str,
  126. port: int,
  127. proxy_host: Optional[str] = None,
  128. proxy_port: Optional[int] = None,
  129. proxy_user: Optional[str] = None,
  130. proxy_password: Optional[str] = None,
  131. ) -> smtplib.SMTP_SSL:
  132. """
  133. 创建连接 (同步方法,将在线程中运行)
  134. """
  135. if proxy_host and proxy_port and proxy_port > 0:
  136. with _PROXY_LOCK: # 加锁
  137. original_socket = socket.socket
  138. socks.setdefaultproxy(
  139. proxy_type=socks.SOCKS5,
  140. addr=proxy_host,
  141. port=proxy_port,
  142. username=proxy_user or None,
  143. password=proxy_password or None,
  144. )
  145. socket.socket = socks.socksocket
  146. try:
  147. smtp = smtplib.SMTP_SSL(host, port)
  148. finally:
  149. socket.socket = original_socket
  150. else:
  151. smtp = smtplib.SMTP_SSL(host, port)
  152. return smtp
  153. @staticmethod
  154. async def fetch_email_authorizations2(
  155. db: Session,
  156. auth,
  157. sender: str,
  158. recipient: str,
  159. subject_keywords: str,
  160. body_keywords: str
  161. ):
  162. # =========================================================
  163. # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
  164. # =========================================================
  165. # 1. 构建动态 SQL
  166. # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
  167. sql = "SELECT uid, subject, body_text FROM emails WHERE 1=1"
  168. params = {}
  169. # 2. 处理发件人 (模糊匹配)
  170. if sender.strip():
  171. sql += " AND sender LIKE :sender"
  172. params['sender'] = f"%{sender.strip()}%"
  173. # 3. 处理收件人 (模糊匹配)
  174. if recipient.strip():
  175. sql += " AND recipient LIKE :recipient"
  176. params['recipient'] = f"%{recipient.strip()}%"
  177. # 4. 处理主题关键词 (OR 关系)
  178. subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
  179. if subj_keys:
  180. for i, k in enumerate(subj_keys):
  181. key_name = f"subj_{i}"
  182. # 直接拼接到主 SQL 中,要求同时满足
  183. sql += f" AND subject LIKE :{key_name}"
  184. params[key_name] = f"%{k}%"
  185. # 5. 处理内容关键词 (OR 关系)
  186. body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
  187. if body_keys:
  188. for i, k in enumerate(body_keys):
  189. key_name = f"body_{i}"
  190. # 直接拼接到主 SQL 中,要求同时满足
  191. sql += f" AND body_text LIKE :{key_name}"
  192. params[key_name] = f"%{k}%"
  193. # 6. 获取最新的一条
  194. sql += " ORDER BY uid DESC LIMIT 1"
  195. # 执行查询
  196. result_proxy = await db.execute(text(sql), params)
  197. result = result_proxy.fetchone()
  198. if not result:
  199. logger.info(f"DB Search: No email found for {sender} -> {recipient}")
  200. return None
  201. target_uid = result.uid
  202. target_subject = result.subject
  203. target_body_text = result.body_text
  204. logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
  205. return f'{target_subject}\n{target_body_text}'
  206. @staticmethod
  207. async def fetch_email_authorizations(
  208. auth,
  209. sender: str,
  210. recipient: str,
  211. subject_keywords: str,
  212. body_keywords: str,
  213. sent_date: str,
  214. expiry: int = 300,
  215. only_text: bool = True
  216. ) -> Optional[str]:
  217. """
  218. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  219. """
  220. def _worker():
  221. EMAIL_ACCOUNT = auth.email
  222. EMAIL_PASSWORD = auth.authorization_code
  223. IMAP_SERVER = auth.imap_server
  224. IMAP_PORT = auth.imap_port
  225. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  226. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  227. # === 时间计算 ===
  228. sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
  229. max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
  230. expiry_at = time.time() + max_wait_time
  231. def get_received_time(msg):
  232. """
  233. 使用最后一条 Received 头解析收件时间
  234. """
  235. received_headers = msg.get_all("Received", [])
  236. if not received_headers:
  237. return None
  238. for i, header in enumerate(received_headers, 1):
  239. logger.debug(f" [{i}] {header}")
  240. last_received = received_headers[-1]
  241. if ";" not in last_received:
  242. return None
  243. time_str = last_received.split(";")[-1].strip()
  244. dt_tuple = email.utils.parsedate_tz(time_str)
  245. if not dt_tuple:
  246. return None
  247. return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  248. mail = EmailAuthorizationService._connect_imap_with_proxy(
  249. IMAP_SERVER,
  250. IMAP_PORT,
  251. auth.proxy_host,
  252. auth.proxy_port,
  253. auth.proxy_username,
  254. auth.proxy_password,
  255. )
  256. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  257. mail.select("INBOX")
  258. while time.time() < expiry_at:
  259. mail.noop() # 刷新邮箱状态
  260. _, data = mail.search(None, "ALL")
  261. mail_ids = data[0].split()
  262. if not mail_ids:
  263. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  264. continue
  265. recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
  266. messages = []
  267. debug = True
  268. for email_id in reversed(recent_ids):
  269. res, msg_data = mail.fetch(email_id, "(RFC822)")
  270. if res != "OK" or not msg_data:
  271. if debug:
  272. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  273. continue
  274. msg_bytes = None
  275. for part in msg_data:
  276. if isinstance(part, tuple):
  277. msg_bytes = part[1]
  278. if not msg_bytes:
  279. if debug:
  280. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  281. continue
  282. msg = email.message_from_bytes(msg_bytes)
  283. received_dt = get_received_time(msg)
  284. if not received_dt:
  285. if debug:
  286. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
  287. continue
  288. messages.append((msg, received_dt))
  289. if debug:
  290. logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  291. logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
  292. # 按收件时间降序排序
  293. messages.sort(key=lambda x: x[1], reverse=True)
  294. for msg, received_dt in messages:
  295. # 判断是否在发送时间后的有效窗口内
  296. if received_dt < sent_dt:
  297. if debug:
  298. logger.debug(f"[INFO] 邮件太旧: {received_dt}")
  299. continue
  300. if received_dt > sent_dt + timedelta(seconds=expiry):
  301. if debug:
  302. logger.debug(f"[INFO] 邮件太新: {received_dt}")
  303. continue
  304. # 匹配发件人/收件人
  305. msg_from = msg.get("From", "")
  306. msg_to = msg.get("To", "")
  307. if sender.lower() not in msg_from.lower():
  308. if debug:
  309. logger.debug("发件人不匹配")
  310. continue
  311. if recipient.lower() not in msg_to.lower():
  312. if debug:
  313. logger.debug("收件人不匹配")
  314. continue
  315. # 匹配主题
  316. subject, enc = decode_header(msg.get("Subject"))[0]
  317. if isinstance(subject, bytes):
  318. subject = subject.decode(enc or "utf-8", errors="ignore")
  319. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  320. continue
  321. # 提取正文
  322. body = EmailAuthorizationService._extract_body(msg, only_text)
  323. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  324. continue
  325. # 找到匹配邮件 → 返回内容
  326. mail.close()
  327. mail.logout()
  328. return body.strip()
  329. # 未匹配到 → 等待重试
  330. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  331. mail.close()
  332. mail.logout()
  333. raise NotFoundError("Get email timeout")
  334. return await run_in_threadpool(_worker)
  335. @staticmethod
  336. def _process_recent_emails_sync(
  337. mail, recent_ids, sent_dt, expiry, sender, recipient, subject_keys, body_keys, only_text
  338. ) -> Optional[str]:
  339. """
  340. 同步辅助函数:处理邮件解析逻辑。
  341. 将这段繁琐的逻辑放入线程运行,避免阻塞 Async Loop。
  342. """
  343. messages = []
  344. debug = True
  345. for email_id in reversed(recent_ids):
  346. res, msg_data = mail.fetch(email_id, "(RFC822)")
  347. if res != "OK" or not msg_data:
  348. continue
  349. msg_bytes = None
  350. for part in msg_data:
  351. if isinstance(part, tuple):
  352. msg_bytes = part[1]
  353. if not msg_bytes:
  354. continue
  355. msg = email.message_from_bytes(msg_bytes)
  356. # 解析时间
  357. received_dt = None
  358. received_headers = msg.get_all("Received", [])
  359. if received_headers:
  360. last_received = received_headers[-1]
  361. if ";" in last_received:
  362. time_str = last_received.split(";")[-1].strip()
  363. dt_tuple = email.utils.parsedate_tz(time_str)
  364. if dt_tuple:
  365. received_dt = datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  366. if not received_dt:
  367. continue
  368. messages.append((msg, received_dt))
  369. # 排序
  370. messages.sort(key=lambda x: x[1], reverse=True)
  371. for msg, received_dt in messages:
  372. # 时间判定
  373. if received_dt < sent_dt:
  374. continue
  375. if received_dt > sent_dt + timedelta(seconds=expiry):
  376. continue
  377. # 匹配逻辑
  378. msg_from = msg.get("From", "")
  379. msg_to = msg.get("To", "")
  380. if sender.lower() not in msg_from.lower():
  381. continue
  382. if recipient.lower() not in msg_to.lower():
  383. continue
  384. subject_raw = msg.get("Subject")
  385. subject = ""
  386. if subject_raw:
  387. decoded_list = decode_header(subject_raw)
  388. if decoded_list:
  389. sub_bytes, enc = decoded_list[0]
  390. if isinstance(sub_bytes, bytes):
  391. subject = sub_bytes.decode(enc or "utf-8", errors="ignore")
  392. else:
  393. subject = str(sub_bytes)
  394. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  395. continue
  396. body = EmailAuthorizationService._extract_body(msg, only_text)
  397. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  398. continue
  399. return body.strip()
  400. return None
  401. @staticmethod
  402. async def fetch_email_authorizations_from_top_n(
  403. auth,
  404. sender: str,
  405. recipient: str,
  406. subject_keywords: str,
  407. body_keywords: str,
  408. top: int = 10,
  409. only_text: bool = True
  410. ) -> Optional[str]:
  411. # 定义一个纯同步的 worker 函数来执行所有 IMAP 逻辑
  412. def _worker():
  413. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  414. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  415. mail = EmailAuthorizationService._connect_imap_with_proxy(
  416. auth.imap_server,
  417. auth.imap_port,
  418. auth.proxy_host,
  419. auth.proxy_port,
  420. auth.proxy_username,
  421. auth.proxy_password,
  422. )
  423. try:
  424. mail.login(auth.email, auth.authorization_code)
  425. mail.select("INBOX")
  426. _, data = mail.search(None, "ALL")
  427. mail_ids = data[0].split()
  428. if not mail_ids:
  429. return None
  430. recent_ids = mail_ids[-top:]
  431. # 复用上面的解析逻辑,但稍作调整,因为这个方法不需要时间过滤
  432. # 这里为了简单,直接写精简版解析
  433. for email_id in reversed(recent_ids):
  434. res, msg_data = mail.fetch(email_id, "(RFC822)")
  435. if res != "OK" or not msg_data: continue
  436. msg_bytes = None
  437. for part in msg_data:
  438. if isinstance(part, tuple): msg_bytes = part[1]
  439. if not msg_bytes: continue
  440. msg = email.message_from_bytes(msg_bytes)
  441. # 匹配逻辑
  442. msg_from = msg.get("From", "")
  443. msg_to = msg.get("To", "")
  444. if sender.lower() not in msg_from.lower(): continue
  445. if recipient.lower() not in msg_to.lower(): continue
  446. subject_raw = msg.get("Subject")
  447. subject = ""
  448. if subject_raw:
  449. d = decode_header(subject_raw)[0]
  450. subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
  451. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
  452. body = EmailAuthorizationService._extract_body(msg, only_text)
  453. if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
  454. return body.strip()
  455. return None
  456. finally:
  457. try:
  458. mail.close()
  459. mail.logout()
  460. except:
  461. pass
  462. return await run_in_threadpool(_worker)
  463. @staticmethod
  464. async def forward_first_matching_email(
  465. auth,
  466. forward_to: str,
  467. sender: str,
  468. recipient: str,
  469. subject_keywords: str,
  470. body_keywords: str
  471. ):
  472. # 整个逻辑比较复杂且都是 IO,直接打包扔进线程池
  473. def _worker():
  474. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  475. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  476. mail = EmailAuthorizationService._connect_imap_with_proxy(
  477. auth.imap_server, auth.imap_port, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password
  478. )
  479. try:
  480. mail.login(auth.email, auth.authorization_code)
  481. mail.select("INBOX")
  482. target = recipient
  483. # 注意:IMAP search 语法对引号很敏感,确保 target 没有特殊字符破坏命令
  484. query = f'(HEADER To "{target}")'
  485. res, data = mail.uid("search", None, query)
  486. if res != "OK": return None
  487. uids = data[0].split()
  488. msgs = []
  489. for uid in uids:
  490. res, msg_data = mail.uid("fetch", uid, "(RFC822)")
  491. if res != "OK": continue
  492. msg = email.message_from_bytes(msg_data[0][1])
  493. # 处理 Date 头可能缺失的情况
  494. date_str = msg.get("Date")
  495. if date_str:
  496. date_ = email.utils.parsedate_to_datetime(date_str)
  497. msgs.append((date_, msg))
  498. msgs.sort(key=lambda x: x[0], reverse=True)
  499. for _, msg in msgs:
  500. msg_from = msg.get("From", "")
  501. if sender.lower() not in msg_from.lower(): continue
  502. subject_raw = msg.get("Subject", "")
  503. subject = ""
  504. d = decode_header(subject_raw)[0]
  505. subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
  506. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
  507. body = EmailAuthorizationService._extract_body(msg, True)
  508. if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
  509. # 准备转发
  510. # 注意:直接修改 Header 转发可能会破坏 DKIM 签名,更好的方式是作为附件转发
  511. # 但这里保持原逻辑
  512. del msg['From']
  513. del msg['To']
  514. del msg['Subject']
  515. msg['From'] = auth.email
  516. msg['To'] = forward_to
  517. msg['Subject'] = f"FWD: {subject}"
  518. # 调用同步的 send 方法
  519. EmailAuthorizationService.send_email_smtp(auth, msg)
  520. return f"邮件 '{subject}' 已成功转发至: {forward_to}"
  521. return None
  522. finally:
  523. try:
  524. mail.logout()
  525. except: pass
  526. return await run_in_threadpool(_worker)
  527. @staticmethod
  528. async def forward_first_matching_email2(
  529. db: Session,
  530. auth,
  531. forward_to: str,
  532. sender: str,
  533. recipient: str,
  534. subject_keywords: str,
  535. body_keywords: str
  536. ):
  537. # =========================================================
  538. # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
  539. # =========================================================
  540. # 1. 构建动态 SQL
  541. # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
  542. sql = "SELECT uid, subject FROM emails WHERE 1=1"
  543. params = {}
  544. # 2. 处理发件人 (模糊匹配)
  545. if sender.strip():
  546. sql += " AND sender LIKE :sender"
  547. params['sender'] = f"%{sender.strip()}%"
  548. # 3. 处理收件人 (模糊匹配)
  549. if recipient.strip():
  550. sql += " AND recipient LIKE :recipient"
  551. params['recipient'] = f"%{recipient.strip()}%"
  552. # 4. 处理主题关键词 (OR 关系)
  553. subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
  554. if subj_keys:
  555. for i, k in enumerate(subj_keys):
  556. key_name = f"subj_{i}"
  557. # 直接拼接到主 SQL 中,要求同时满足
  558. sql += f" AND subject LIKE :{key_name}"
  559. params[key_name] = f"%{k}%"
  560. # 5. 处理内容关键词 (OR 关系)
  561. body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
  562. if body_keys:
  563. for i, k in enumerate(body_keys):
  564. key_name = f"body_{i}"
  565. # 直接拼接到主 SQL 中,要求同时满足
  566. sql += f" AND body_text LIKE :{key_name}"
  567. params[key_name] = f"%{k}%"
  568. # 6. 获取最新的一条
  569. sql += " ORDER BY uid DESC LIMIT 1"
  570. try:
  571. # 执行查询
  572. result_proxy = await db.execute(text(sql), params)
  573. result = result_proxy.fetchone()
  574. if not result:
  575. logger.info(f"DB Search: No email found for {sender} -> {recipient}")
  576. return None
  577. target_uid = result.uid
  578. target_subject = result.subject
  579. logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
  580. except Exception as e:
  581. logger.error(f"DB Search Error: {e}")
  582. return f"数据库查询失败: {str(e)}"
  583. # =========================================================
  584. # 第二步:去 IMAP 拉取原始内容并转发 (放入线程池执行 IO 操作)
  585. # =========================================================
  586. def _worker():
  587. mail = None
  588. try:
  589. # 1. 连接 IMAP
  590. mail = EmailAuthorizationService._connect_imap_with_proxy(
  591. auth.imap_server, auth.imap_port,
  592. auth.proxy_host, auth.proxy_port,
  593. auth.proxy_username, auth.proxy_password
  594. )
  595. mail.login(auth.email, auth.authorization_code)
  596. mail.select("INBOX")
  597. # 2. 根据 UID 精准拉取 (使用 fetch)
  598. # 注意:IMAPClient 的 fetch 方法
  599. # UID 必须转为 int 或者 sequence set 字符串
  600. res, data = mail.uid('fetch', str(target_uid), '(RFC822)')
  601. # 🔴 修正点:不要写 if target_uid in res
  602. # res 是状态字符串 "OK",data 是包含邮件内容的列表
  603. if res != 'OK':
  604. return f"IMAP Fetch 失败,状态码: {res}"
  605. if not data or not data[0]:
  606. return f"未找到 UID {target_uid} 的邮件内容 (可能已被物理删除)"
  607. # data[0] 通常是 tuple (byte_header, byte_content),但也可能是 None
  608. if isinstance(data[0], tuple):
  609. raw_email_bytes = data[0][1]
  610. else:
  611. # 如果 data[0] 只是 bytes (例如 b')'),说明没拿到邮件体
  612. return f"邮件数据格式异常,无法解析: {str(data)}"
  613. msg = email.message_from_bytes(raw_email_bytes)
  614. # 3. 处理转发逻辑
  615. # 使用数据库中查到的标题,确保标题准确
  616. subject = target_subject
  617. # 修改 Header 进行转发
  618. del msg['From']
  619. del msg['To']
  620. del msg['Cc']
  621. del msg['Subject']
  622. msg['From'] = auth.email # 发件人覆写为当前授权账号
  623. msg['To'] = forward_to
  624. msg['Subject'] = f"FWD: {subject}"
  625. # 4. 发送邮件 (SMTP)
  626. EmailAuthorizationService.send_email_smtp(auth, msg)
  627. return f"邮件 '{subject}' (UID: {target_uid}) 已成功转发至: {forward_to}"
  628. except Exception as e:
  629. logger.error(f"IMAP Forward Error: {e}")
  630. return f"邮件转发过程出错: {str(e)}"
  631. finally:
  632. if mail:
  633. try:
  634. mail.logout()
  635. except: pass
  636. # 在线程池中运行耗时 IO 操作
  637. return await run_in_threadpool(_worker)
  638. @staticmethod
  639. async def send_email(
  640. auth,
  641. send_to: str,
  642. subject: str,
  643. content_type: str,
  644. content: str
  645. ):
  646. def _worker():
  647. msg = EmailMessage()
  648. msg["From"] = auth.email
  649. msg["To"] = send_to
  650. msg["Subject"] = subject
  651. if content_type.lower() == "html":
  652. msg.set_content("") # 占位
  653. msg.add_alternative(content, subtype="html")
  654. else:
  655. msg.set_content(content)
  656. EmailAuthorizationService.send_email_smtp(auth, msg)
  657. return f"邮件 '{subject}' 成功发送至: {send_to}"
  658. return await run_in_threadpool(_worker)
  659. @staticmethod
  660. async def send_email_bulk(
  661. auth,
  662. send_to: str,
  663. subject: str,
  664. content_type: str,
  665. content: str
  666. ):
  667. def _worker():
  668. bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
  669. msg = EmailMessage()
  670. msg["From"] = auth.email
  671. msg["To"] = bcc_list[0] if bcc_list else auth.email # Fallback
  672. msg["Subject"] = subject
  673. if content_type.lower() == "html":
  674. msg.set_content("")
  675. msg.add_alternative(content, subtype="html")
  676. else:
  677. msg.set_content(content)
  678. EmailAuthorizationService.send_email_smtp(auth, msg, bcc_list=bcc_list)
  679. return f"邮件 '{subject}' 成功发送至: {send_to}"
  680. return await run_in_threadpool(_worker)
  681. # ----------------------------------------------------------------------
  682. # 底层 SMTP 发送 (保持同步,供 Worker 调用)
  683. # ----------------------------------------------------------------------
  684. @staticmethod
  685. def send_email_smtp(auth, msg, bcc_list=None):
  686. if bcc_list is None:
  687. bcc_list = []
  688. # 这里的 connect 内部已经加了锁,是安全的
  689. mail = EmailAuthorizationService._connect_smtp_with_proxy(
  690. auth.smtp_server,
  691. auth.smtp_port,
  692. auth.proxy_host,
  693. auth.proxy_port,
  694. auth.proxy_username,
  695. auth.proxy_password,
  696. )
  697. try:
  698. mail.login(auth.email, auth.authorization_code)
  699. if bcc_list:
  700. mail.send_message(msg, to_addrs=bcc_list)
  701. else:
  702. mail.send_message(msg)
  703. finally:
  704. mail.quit()
  705. @staticmethod
  706. def _extract_body(msg, only_text: bool = True) -> str:
  707. # 纯 CPU 计算,不需要 async,保留原样
  708. body_parts = []
  709. if msg.is_multipart():
  710. for part in msg.walk():
  711. ctype = part.get_content_type()
  712. if only_text and ctype != "text/plain":
  713. continue
  714. if not only_text and ctype not in ["text/plain", "text/html"]:
  715. continue
  716. if part.get("Content-Disposition"):
  717. continue
  718. charset = part.get_content_charset() or "utf-8"
  719. try:
  720. payload = part.get_payload(decode=True)
  721. if payload:
  722. text = payload.decode(charset, errors="ignore")
  723. body_parts.append(text)
  724. except Exception:
  725. continue
  726. else:
  727. charset = msg.get_content_charset() or "utf-8"
  728. payload = msg.get_payload(decode=True)
  729. if payload:
  730. body_parts.append(payload.decode(charset, errors="ignore"))
  731. body = "\n".join(body_parts)
  732. return re.sub(r"\s+", " ", body.strip())