email_authorizations_service.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. import threading
  2. import socket
  3. import socks
  4. import imaplib
  5. import smtplib
  6. import email
  7. import asyncio
  8. import re
  9. import time
  10. from datetime import datetime, timedelta, timezone
  11. import email.policy
  12. from email.message import EmailMessage
  13. from email.utils import formatdate, make_msgid
  14. from email.header import decode_header
  15. from typing import List, Optional
  16. from sqlalchemy.orm import Session
  17. from sqlalchemy.ext.asyncio import AsyncSession
  18. from sqlalchemy import select, text
  19. from starlette.concurrency import run_in_threadpool
  20. from app.core.logger import logger
  21. from app.core.biz_exception import NotFoundError, BizLogicError
  22. from app.models.email_authorizations import EmailAuthorization
  23. from app.schemas.email_authorizations import EmailAuthorizationCreate, EmailAuthorizationUpdate
  24. # 保持锁逻辑不变
  25. _PROXY_LOCK = threading.Lock()
  26. class EmailAuthorizationService:
  27. DEFAULT_READ_TOP_N_EMAIL = 10
  28. RETRY_DELAY_SECONDS = 5
  29. # =================================================================
  30. # 数据库操作 (DB CRUD) - 使用 AsyncSession
  31. # =================================================================
  32. @staticmethod
  33. async def get_all(db: AsyncSession) -> List[EmailAuthorization]:
  34. # AsyncSession 不支持 db.query,需要用 select(Model)
  35. stmt = select(EmailAuthorization).order_by(EmailAuthorization.id.desc())
  36. result = await db.execute(stmt)
  37. return result.scalars().all()
  38. @staticmethod
  39. async def get_by_id(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
  40. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  41. result = await db.execute(stmt)
  42. # scalar_one_or_none 类似于 .first(),但更严格(如果有多个会报错,这里id是主键所以没问题)
  43. obj = result.scalar_one_or_none()
  44. if not obj:
  45. raise NotFoundError("Email authorization not found")
  46. return obj
  47. @staticmethod
  48. async def get_by_email(db: AsyncSession, email: str) -> Optional[EmailAuthorization]:
  49. stmt = select(EmailAuthorization).where(EmailAuthorization.email == email)
  50. result = await db.execute(stmt)
  51. obj = result.scalar_one_or_none()
  52. if not obj:
  53. raise NotFoundError("Email authorization not found")
  54. return obj
  55. @staticmethod
  56. async def create(db: AsyncSession, obj_in: EmailAuthorizationCreate) -> EmailAuthorization:
  57. # 先检查是否存在
  58. stmt = select(EmailAuthorization).where(EmailAuthorization.email == obj_in.email)
  59. result = await db.execute(stmt)
  60. if result.scalar_one_or_none():
  61. raise BizLogicError(f"Email {obj_in.email} already exist")
  62. # 创建对象
  63. db_obj = EmailAuthorization(**obj_in.dict(exclude_unset=True))
  64. # db.add 是同步方法(只是添加到 session 上下文)
  65. db.add(db_obj)
  66. # commit 和 refresh 是异步的
  67. await db.commit()
  68. await db.refresh(db_obj)
  69. return db_obj
  70. @staticmethod
  71. async def update(db: AsyncSession, id: int, obj_in: EmailAuthorizationUpdate) -> Optional[EmailAuthorization]:
  72. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  73. result = await db.execute(stmt)
  74. db_obj = result.scalar_one_or_none()
  75. if not db_obj:
  76. raise NotFoundError("Email authorization not found")
  77. for field, value in obj_in.dict(exclude_unset=True).items():
  78. setattr(db_obj, field, value)
  79. db.add(db_obj)
  80. await db.commit()
  81. await db.refresh(db_obj)
  82. return db_obj
  83. @staticmethod
  84. async def delete(db: AsyncSession, id: int) -> Optional[EmailAuthorization]:
  85. stmt = select(EmailAuthorization).where(EmailAuthorization.id == id)
  86. result = await db.execute(stmt)
  87. db_obj = result.scalar_one_or_none()
  88. if not db_obj:
  89. raise NotFoundError("Email authorization not found")
  90. # delete 也是同步标记
  91. await db.delete(db_obj)
  92. await db.commit()
  93. return db_obj
  94. @staticmethod
  95. def _connect_imap_with_proxy(
  96. host: str,
  97. port: int,
  98. proxy_host: Optional[str] = None,
  99. proxy_port: Optional[int] = None,
  100. proxy_user: Optional[str] = None,
  101. proxy_password: Optional[str] = None,
  102. ) -> imaplib.IMAP4_SSL:
  103. """
  104. 创建连接 (同步方法,将在线程中运行)
  105. 使用 Lock 确保 socket patching 不会影响其他并发请求
  106. """
  107. if proxy_host and proxy_port and proxy_port > 0:
  108. with _PROXY_LOCK: # 加锁,防止多线程同时修改全局 socket
  109. original_socket = socket.socket
  110. socks.setdefaultproxy(
  111. proxy_type=socks.SOCKS5,
  112. addr=proxy_host,
  113. port=proxy_port,
  114. username=proxy_user or None,
  115. password=proxy_password or None,
  116. )
  117. socket.socket = socks.socksocket
  118. try:
  119. imap = imaplib.IMAP4_SSL(host, port)
  120. finally:
  121. socket.socket = original_socket
  122. else:
  123. imap = imaplib.IMAP4_SSL(host, port)
  124. return imap
  125. @staticmethod
  126. def _connect_smtp_with_proxy(
  127. host: str,
  128. port: int,
  129. proxy_host: Optional[str] = None,
  130. proxy_port: Optional[int] = None,
  131. proxy_user: Optional[str] = None,
  132. proxy_password: Optional[str] = None,
  133. ) -> smtplib.SMTP_SSL:
  134. """
  135. 创建连接 (同步方法,将在线程中运行)
  136. """
  137. if proxy_host and proxy_port and proxy_port > 0:
  138. with _PROXY_LOCK: # 加锁
  139. original_socket = socket.socket
  140. socks.setdefaultproxy(
  141. proxy_type=socks.SOCKS5,
  142. addr=proxy_host,
  143. port=proxy_port,
  144. username=proxy_user or None,
  145. password=proxy_password or None,
  146. )
  147. socket.socket = socks.socksocket
  148. try:
  149. smtp = smtplib.SMTP_SSL(host, port)
  150. finally:
  151. socket.socket = original_socket
  152. else:
  153. smtp = smtplib.SMTP_SSL(host, port)
  154. return smtp
  155. @staticmethod
  156. async def fetch_email_authorizations2(
  157. db: Session,
  158. auth,
  159. sender: str,
  160. recipient: str,
  161. subject_keywords: str,
  162. body_keywords: str
  163. ):
  164. # =========================================================
  165. # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
  166. # =========================================================
  167. # 1. 构建动态 SQL
  168. # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
  169. sql = "SELECT uid, subject, body_text FROM emails WHERE 1=1"
  170. params = {}
  171. # 2. 处理发件人 (模糊匹配)
  172. if sender.strip():
  173. sql += " AND sender LIKE :sender"
  174. params['sender'] = f"%{sender.strip()}%"
  175. # 3. 处理收件人 (模糊匹配)
  176. if recipient.strip():
  177. sql += " AND recipient LIKE :recipient"
  178. params['recipient'] = f"%{recipient.strip()}%"
  179. # 4. 处理主题关键词 (OR 关系)
  180. subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
  181. if subj_keys:
  182. for i, k in enumerate(subj_keys):
  183. key_name = f"subj_{i}"
  184. # 直接拼接到主 SQL 中,要求同时满足
  185. sql += f" AND subject LIKE :{key_name}"
  186. params[key_name] = f"%{k}%"
  187. # 5. 处理内容关键词 (OR 关系)
  188. body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
  189. if body_keys:
  190. for i, k in enumerate(body_keys):
  191. key_name = f"body_{i}"
  192. # 直接拼接到主 SQL 中,要求同时满足
  193. sql += f" AND body_text LIKE :{key_name}"
  194. params[key_name] = f"%{k}%"
  195. # 6. 获取最新的一条
  196. sql += " ORDER BY uid DESC LIMIT 1"
  197. # 执行查询
  198. result_proxy = await db.execute(text(sql), params)
  199. result = result_proxy.fetchone()
  200. if not result:
  201. logger.info(f"DB Search: No email found for {sender} -> {recipient}")
  202. return None
  203. target_uid = result.uid
  204. target_subject = result.subject
  205. target_body_text = result.body_text
  206. logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
  207. return f'{target_subject}\n{target_body_text}'
  208. @staticmethod
  209. async def fetch_email_authorizations(
  210. auth,
  211. sender: str,
  212. recipient: str,
  213. subject_keywords: str,
  214. body_keywords: str,
  215. sent_date: str,
  216. expiry: int = 300,
  217. only_text: bool = True
  218. ) -> Optional[str]:
  219. """
  220. 在有效期内循环读取邮箱,找到符合条件的邮件(使用最后一条 Received 头作为收件时间)
  221. """
  222. def _worker():
  223. EMAIL_ACCOUNT = auth.email
  224. EMAIL_PASSWORD = auth.authorization_code
  225. IMAP_SERVER = auth.imap_server
  226. IMAP_PORT = auth.imap_port
  227. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  228. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  229. # === 时间计算 ===
  230. sent_dt = datetime.strptime(sent_date, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
  231. max_wait_time = min(5 * 60, expiry) # 最长等待5分钟
  232. expiry_at = time.time() + max_wait_time
  233. def get_received_time(msg):
  234. """
  235. 使用最后一条 Received 头解析收件时间
  236. """
  237. received_headers = msg.get_all("Received", [])
  238. if not received_headers:
  239. return None
  240. for i, header in enumerate(received_headers, 1):
  241. logger.debug(f" [{i}] {header}")
  242. last_received = received_headers[-1]
  243. if ";" not in last_received:
  244. return None
  245. time_str = last_received.split(";")[-1].strip()
  246. dt_tuple = email.utils.parsedate_tz(time_str)
  247. if not dt_tuple:
  248. return None
  249. return datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  250. mail = EmailAuthorizationService._connect_imap_with_proxy(
  251. IMAP_SERVER,
  252. IMAP_PORT,
  253. auth.proxy_host,
  254. auth.proxy_port,
  255. auth.proxy_username,
  256. auth.proxy_password,
  257. )
  258. mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
  259. mail.select("INBOX")
  260. while time.time() < expiry_at:
  261. mail.noop() # 刷新邮箱状态
  262. _, data = mail.search(None, "ALL")
  263. mail_ids = data[0].split()
  264. if not mail_ids:
  265. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  266. continue
  267. recent_ids = mail_ids[-EmailAuthorizationService.DEFAULT_READ_TOP_N_EMAIL:]
  268. messages = []
  269. debug = True
  270. for email_id in reversed(recent_ids):
  271. res, msg_data = mail.fetch(email_id, "(RFC822)")
  272. if res != "OK" or not msg_data:
  273. if debug:
  274. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 获取失败")
  275. continue
  276. msg_bytes = None
  277. for part in msg_data:
  278. if isinstance(part, tuple):
  279. msg_bytes = part[1]
  280. if not msg_bytes:
  281. if debug:
  282. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 无正文")
  283. continue
  284. msg = email.message_from_bytes(msg_bytes)
  285. received_dt = get_received_time(msg)
  286. if not received_dt:
  287. if debug:
  288. logger.debug(f"[WARN] 邮件 ID={email_id.decode()} 未解析出 Received 时间")
  289. continue
  290. messages.append((msg, received_dt))
  291. if debug:
  292. logger.debug(f"[DEBUG] 成功解析邮件数: {len(messages)}")
  293. logger.debug(f"[DEBUG] 收件时间列表: {[m[1] for m in messages]}")
  294. # 按收件时间降序排序
  295. messages.sort(key=lambda x: x[1], reverse=True)
  296. for msg, received_dt in messages:
  297. # 判断是否在发送时间后的有效窗口内
  298. if received_dt < sent_dt:
  299. if debug:
  300. logger.debug(f"[INFO] 邮件太旧: {received_dt}")
  301. continue
  302. if received_dt > sent_dt + timedelta(seconds=expiry):
  303. if debug:
  304. logger.debug(f"[INFO] 邮件太新: {received_dt}")
  305. continue
  306. # 匹配发件人/收件人
  307. msg_from = msg.get("From", "")
  308. msg_to = msg.get("To", "")
  309. if sender.lower() not in msg_from.lower():
  310. if debug:
  311. logger.debug("发件人不匹配")
  312. continue
  313. if recipient.lower() not in msg_to.lower():
  314. if debug:
  315. logger.debug("收件人不匹配")
  316. continue
  317. # 匹配主题
  318. subject, enc = decode_header(msg.get("Subject"))[0]
  319. if isinstance(subject, bytes):
  320. subject = subject.decode(enc or "utf-8", errors="ignore")
  321. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  322. continue
  323. # 提取正文
  324. body = EmailAuthorizationService._extract_body(msg, only_text)
  325. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  326. continue
  327. # 找到匹配邮件 → 返回内容
  328. mail.close()
  329. mail.logout()
  330. return body.strip()
  331. # 未匹配到 → 等待重试
  332. time.sleep(EmailAuthorizationService.RETRY_DELAY_SECONDS)
  333. mail.close()
  334. mail.logout()
  335. raise NotFoundError("Get email timeout")
  336. return await run_in_threadpool(_worker)
  337. @staticmethod
  338. def _process_recent_emails_sync(
  339. mail, recent_ids, sent_dt, expiry, sender, recipient, subject_keys, body_keys, only_text
  340. ) -> Optional[str]:
  341. """
  342. 同步辅助函数:处理邮件解析逻辑。
  343. 将这段繁琐的逻辑放入线程运行,避免阻塞 Async Loop。
  344. """
  345. messages = []
  346. debug = True
  347. for email_id in reversed(recent_ids):
  348. res, msg_data = mail.fetch(email_id, "(RFC822)")
  349. if res != "OK" or not msg_data:
  350. continue
  351. msg_bytes = None
  352. for part in msg_data:
  353. if isinstance(part, tuple):
  354. msg_bytes = part[1]
  355. if not msg_bytes:
  356. continue
  357. msg = email.message_from_bytes(msg_bytes)
  358. # 解析时间
  359. received_dt = None
  360. received_headers = msg.get_all("Received", [])
  361. if received_headers:
  362. last_received = received_headers[-1]
  363. if ";" in last_received:
  364. time_str = last_received.split(";")[-1].strip()
  365. dt_tuple = email.utils.parsedate_tz(time_str)
  366. if dt_tuple:
  367. received_dt = datetime.fromtimestamp(email.utils.mktime_tz(dt_tuple), tz=timezone.utc)
  368. if not received_dt:
  369. continue
  370. messages.append((msg, received_dt))
  371. # 排序
  372. messages.sort(key=lambda x: x[1], reverse=True)
  373. for msg, received_dt in messages:
  374. # 时间判定
  375. if received_dt < sent_dt:
  376. continue
  377. if received_dt > sent_dt + timedelta(seconds=expiry):
  378. continue
  379. # 匹配逻辑
  380. msg_from = msg.get("From", "")
  381. msg_to = msg.get("To", "")
  382. if sender.lower() not in msg_from.lower():
  383. continue
  384. if recipient.lower() not in msg_to.lower():
  385. continue
  386. subject_raw = msg.get("Subject")
  387. subject = ""
  388. if subject_raw:
  389. decoded_list = decode_header(subject_raw)
  390. if decoded_list:
  391. sub_bytes, enc = decoded_list[0]
  392. if isinstance(sub_bytes, bytes):
  393. subject = sub_bytes.decode(enc or "utf-8", errors="ignore")
  394. else:
  395. subject = str(sub_bytes)
  396. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys):
  397. continue
  398. body = EmailAuthorizationService._extract_body(msg, only_text)
  399. if body_keys and not any(k.lower() in body.lower() for k in body_keys):
  400. continue
  401. return body.strip()
  402. return None
  403. @staticmethod
  404. async def fetch_email_authorizations_from_top_n(
  405. auth,
  406. sender: str,
  407. recipient: str,
  408. subject_keywords: str,
  409. body_keywords: str,
  410. top: int = 10,
  411. only_text: bool = True
  412. ) -> Optional[str]:
  413. # 定义一个纯同步的 worker 函数来执行所有 IMAP 逻辑
  414. def _worker():
  415. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  416. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  417. mail = EmailAuthorizationService._connect_imap_with_proxy(
  418. auth.imap_server,
  419. auth.imap_port,
  420. auth.proxy_host,
  421. auth.proxy_port,
  422. auth.proxy_username,
  423. auth.proxy_password,
  424. )
  425. try:
  426. mail.login(auth.email, auth.authorization_code)
  427. mail.select("INBOX")
  428. _, data = mail.search(None, "ALL")
  429. mail_ids = data[0].split()
  430. if not mail_ids:
  431. return None
  432. recent_ids = mail_ids[-top:]
  433. # 复用上面的解析逻辑,但稍作调整,因为这个方法不需要时间过滤
  434. # 这里为了简单,直接写精简版解析
  435. for email_id in reversed(recent_ids):
  436. res, msg_data = mail.fetch(email_id, "(RFC822)")
  437. if res != "OK" or not msg_data: continue
  438. msg_bytes = None
  439. for part in msg_data:
  440. if isinstance(part, tuple): msg_bytes = part[1]
  441. if not msg_bytes: continue
  442. msg = email.message_from_bytes(msg_bytes)
  443. # 匹配逻辑
  444. msg_from = msg.get("From", "")
  445. msg_to = msg.get("To", "")
  446. if sender.lower() not in msg_from.lower(): continue
  447. if recipient.lower() not in msg_to.lower(): continue
  448. subject_raw = msg.get("Subject")
  449. subject = ""
  450. if subject_raw:
  451. d = decode_header(subject_raw)[0]
  452. subject = d[0].decode(d[1] or "utf-8", errors="ignore") if isinstance(d[0], bytes) else str(d[0])
  453. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
  454. body = EmailAuthorizationService._extract_body(msg, only_text)
  455. if body_keys and not any(k.lower() in body.lower() for k in body_keys): continue
  456. return body.strip()
  457. return None
  458. finally:
  459. try:
  460. mail.close()
  461. mail.logout()
  462. except:
  463. pass
  464. return await run_in_threadpool(_worker)
  465. @staticmethod
  466. async def forward_first_matching_email(
  467. auth,
  468. forward_to: str,
  469. sender: str,
  470. recipient: str,
  471. subject_keywords: str,
  472. body_keywords: str
  473. ):
  474. def _worker():
  475. subject_keys = [s.strip() for s in subject_keywords.split(",") if s.strip()]
  476. body_keys = [s.strip() for s in body_keywords.split(",") if s.strip()]
  477. mail = EmailAuthorizationService._connect_imap_with_proxy(
  478. auth.imap_server, auth.imap_port, auth.proxy_host, auth.proxy_port, auth.proxy_username, auth.proxy_password
  479. )
  480. try:
  481. mail.login(auth.email, auth.authorization_code)
  482. mail.select("INBOX")
  483. # 1. 搜索目标邮件
  484. target = recipient
  485. query = f'(HEADER To "{target}")'
  486. res, data = mail.uid("search", None, query)
  487. if res != "OK": return None
  488. uids = data[0].split()
  489. msgs_to_check = []
  490. for uid in uids:
  491. # 使用 RFC822 获取完整内容
  492. res, msg_data = mail.uid("fetch", uid, "(RFC822)")
  493. if res != "OK" or not msg_data: continue
  494. # 临时解析用于排序和初步过滤
  495. raw_bytes = msg_data[0][1]
  496. tmp_msg = email.message_from_bytes(raw_bytes, policy=email.policy.default)
  497. date_str = tmp_msg.get("Date")
  498. if date_str:
  499. try:
  500. date_dt = parsedate_to_datetime(date_str)
  501. msgs_to_check.append((date_dt, tmp_msg, raw_bytes))
  502. except:
  503. continue
  504. # 按时间降序排序(最新的优先)
  505. msgs_to_check.sort(key=lambda x: x[0], reverse=True)
  506. for _, orig_msg, raw_bytes in msgs_to_check:
  507. # --- 过滤逻辑 ---
  508. msg_from = orig_msg.get("From", "")
  509. if sender.lower() not in msg_from.lower(): continue
  510. subject = orig_msg.get("Subject", "")
  511. if subject_keys and not any(k.lower() in subject.lower() for k in subject_keys): continue
  512. body_content = EmailAuthorizationService._extract_body(orig_msg, True)
  513. if body_keys and not any(k.lower() in body_content.lower() for k in body_keys): continue
  514. # --- 匹配成功:开始构造转发邮件 ---
  515. # 1. 提取原始信息用于视觉转发头
  516. orig_from = orig_msg.get("From", "Unknown")
  517. orig_date = orig_msg.get("Date", "Unknown")
  518. orig_subject = orig_msg.get("Subject", "No Subject")
  519. orig_to = orig_msg.get("To", "Unknown")
  520. orig_msg_id = orig_msg.get("Message-ID")
  521. fwd_info = (
  522. f"\n\n---------- Forwarded message ----------\n"
  523. f"From: {orig_from}\n"
  524. f"Date: {orig_date}\n"
  525. f"Subject: {orig_subject}\n"
  526. f"To: {orig_to}\n\n"
  527. )
  528. # 2. 构造新的邮件对象 (重新基于原始字节解析,确保附件完整)
  529. msg = email.message_from_bytes(raw_bytes, policy=email.policy.default)
  530. # 3. 清理并重置 Header
  531. headers_to_clean = ['From', 'To', 'Cc', 'Bcc', 'Subject', 'Date', 'Message-ID', 'In-Reply-To', 'References']
  532. for h in headers_to_clean:
  533. del msg[h]
  534. msg['From'] = auth.email
  535. msg['To'] = forward_to
  536. msg['Subject'] = f"Fwd: {orig_subject}"
  537. msg['Date'] = formatdate(localtime=True)
  538. msg['Message-ID'] = make_msgid(domain=auth.email.split('@')[-1])
  539. # 4. 【核心】建立上下文关联 (Threading)
  540. if orig_msg_id:
  541. msg['In-Reply-To'] = orig_msg_id
  542. msg['References'] = orig_msg_id
  543. # 5. 【核心】注入视觉转发头 (Visual Prepend)
  544. try:
  545. if msg.is_multipart():
  546. # 遍历部分,找到主要正文并插入
  547. for part in msg.walk():
  548. ctype = part.get_content_type()
  549. if ctype == "text/plain":
  550. part.set_content(fwd_info + part.get_content())
  551. break
  552. elif ctype == "text/html":
  553. html_fwd = fwd_info.replace("\n", "<br>")
  554. part.set_content(f"<div>{html_fwd}</div>" + part.get_content(), subtype="html")
  555. break
  556. else:
  557. msg.set_content(fwd_info + msg.get_content())
  558. except Exception as e:
  559. logger.warning(f"Prepend visual header failed: {e}")
  560. # 6. 发送邮件
  561. EmailAuthorizationService.send_email_smtp(auth, msg)
  562. # 7. 同步发件记录 (IMAP Sent)
  563. EmailAuthorizationService._append_to_sent(auth, msg)
  564. return f"邮件 '{orig_subject}' 已成功关联转发至: {forward_to}"
  565. return None
  566. except Exception as e:
  567. logger.error(f"Forward matching email error: {e}")
  568. return None
  569. finally:
  570. try:
  571. mail.logout()
  572. except: pass
  573. return await run_in_threadpool(_worker)
  574. @staticmethod
  575. async def forward_first_matching_email2(
  576. db: Session,
  577. auth,
  578. forward_to: str,
  579. sender: str,
  580. recipient: str,
  581. subject_keywords: str,
  582. body_keywords: str
  583. ):
  584. # =========================================================
  585. # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
  586. # =========================================================
  587. # 1. 构建动态 SQL
  588. # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
  589. sql = "SELECT uid, subject FROM emails WHERE 1=1"
  590. params = {}
  591. # 2. 处理发件人 (模糊匹配)
  592. if sender.strip():
  593. sql += " AND sender LIKE :sender"
  594. params['sender'] = f"%{sender.strip()}%"
  595. # 3. 处理收件人 (模糊匹配)
  596. if recipient.strip():
  597. sql += " AND recipient LIKE :recipient"
  598. params['recipient'] = f"%{recipient.strip()}%"
  599. # 4. 处理主题关键词 (OR 关系)
  600. subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
  601. if subj_keys:
  602. for i, k in enumerate(subj_keys):
  603. key_name = f"subj_{i}"
  604. # 直接拼接到主 SQL 中,要求同时满足
  605. sql += f" AND subject LIKE :{key_name}"
  606. params[key_name] = f"%{k}%"
  607. # 5. 处理内容关键词 (OR 关系)
  608. body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
  609. if body_keys:
  610. for i, k in enumerate(body_keys):
  611. key_name = f"body_{i}"
  612. # 直接拼接到主 SQL 中,要求同时满足
  613. sql += f" AND body_text LIKE :{key_name}"
  614. params[key_name] = f"%{k}%"
  615. # 6. 获取最新的一条
  616. sql += " ORDER BY uid DESC LIMIT 1"
  617. try:
  618. # 执行查询
  619. result_proxy = await db.execute(text(sql), params)
  620. result = result_proxy.fetchone()
  621. if not result:
  622. logger.info(f"DB Search: No email found for {sender} -> {recipient}")
  623. return None
  624. target_uid = result.uid
  625. target_subject = result.subject
  626. logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
  627. except Exception as e:
  628. logger.error(f"DB Search Error: {e}")
  629. return f"数据库查询失败: {str(e)}"
  630. # =========================================================
  631. # 第二步:去 IMAP 拉取原始内容并转发 (放入线程池执行 IO 操作)
  632. # =========================================================
  633. def _worker():
  634. mail = None
  635. try:
  636. # 1. 连接 IMAP
  637. mail = EmailAuthorizationService._connect_imap_with_proxy(
  638. auth.imap_server, auth.imap_port,
  639. auth.proxy_host, auth.proxy_port,
  640. auth.proxy_username, auth.proxy_password
  641. )
  642. mail.login(auth.email, auth.authorization_code)
  643. mail.select("INBOX")
  644. # 2. 根据 UID 精准拉取 (使用 fetch)
  645. # 注意:IMAPClient 的 fetch 方法
  646. # UID 必须转为 int 或者 sequence set 字符串
  647. res, data = mail.uid('fetch', str(target_uid), '(RFC822)')
  648. # 🔴 修正点:不要写 if target_uid in res
  649. # res 是状态字符串 "OK",data 是包含邮件内容的列表
  650. if res != 'OK':
  651. return f"IMAP Fetch 失败,状态码: {res}"
  652. if not data or not data[0]:
  653. return f"未找到 UID {target_uid} 的邮件内容 (可能已被物理删除)"
  654. # data[0] 通常是 tuple (byte_header, byte_content),但也可能是 None
  655. if isinstance(data[0], tuple):
  656. raw_email_bytes = data[0][1]
  657. else:
  658. # 如果 data[0] 只是 bytes (例如 b')'),说明没拿到邮件体
  659. return f"邮件数据格式异常,无法解析: {str(data)}"
  660. # 使用 default policy 解析,方便后续修改
  661. orig_msg = email.message_from_bytes(data[0][1], policy=email.policy.default)
  662. # --- 1. 提取原始邮件信息用于构造转发头 ---
  663. orig_from = orig_msg.get("From", "Unknown")
  664. orig_date = orig_msg.get("Date", "Unknown")
  665. orig_subject = orig_msg.get("Subject", "No Subject")
  666. orig_to = orig_msg.get("To", "Unknown")
  667. orig_msg_id = orig_msg.get("Message-ID")
  668. # --- 2. 构造视觉上的“转发信息栏” ---
  669. fwd_header_text = (
  670. f"\n\n---------- Forwarded message ----------\n"
  671. f"From: {orig_from}\n"
  672. f"Date: {orig_date}\n"
  673. f"Subject: {orig_subject}\n"
  674. f"To: {orig_to}\n\n"
  675. )
  676. # --- 3. 构造新的邮件对象 ---
  677. # 为了保持上下文关联,我们克隆或重新构造,并设置 Threading Headers
  678. msg = email.message_from_bytes(data[0][1], policy=email.policy.default)
  679. # 清除旧头
  680. for h in ['From', 'To', 'Cc', 'Bcc', 'Subject', 'Date', 'Message-ID', 'In-Reply-To', 'References']:
  681. del msg[h]
  682. msg['From'] = auth.email
  683. msg['To'] = forward_to
  684. msg['Subject'] = f"Fwd: {target_subject}"
  685. msg['Date'] = formatdate(localtime=True)
  686. msg['Message-ID'] = make_msgid(domain=auth.email.split('@')[-1])
  687. # --- 4. 关键:建立线索关联 (Threading) ---
  688. if orig_msg_id:
  689. # 这两个头告诉 Gmail 这封信是原邮件的后续
  690. msg['In-Reply-To'] = orig_msg_id
  691. msg['References'] = orig_msg_id
  692. # --- 5. 修改正文,注入转发视觉头 ---
  693. # 处理 Multipart 或简单邮件,将 fwd_header_text 插入到正文最前面
  694. try:
  695. if msg.is_multipart():
  696. # 找到第一个文本部分并修改
  697. for part in msg.walk():
  698. if part.get_content_type() == "text/plain":
  699. content = part.get_content()
  700. part.set_content(fwd_header_text + content)
  701. break
  702. elif part.get_content_type() == "text/html":
  703. # HTML 转发头稍微复杂点,这里简单处理
  704. content = part.get_content()
  705. html_fwd = fwd_header_text.replace("\n", "<br>")
  706. part.set_content(f"<div>{html_fwd}</div>" + content, subtype="html")
  707. break
  708. else:
  709. content = msg.get_content()
  710. msg.set_content(fwd_header_text + content)
  711. except Exception as e:
  712. logger.warning(f"Failed to prepend forward header: {e}")
  713. # 4. 发送邮件 (SMTP)
  714. EmailAuthorizationService.send_email_smtp(auth, msg)
  715. return f"邮件 '{target_subject}' (UID: {target_uid}) 已成功转发至: {forward_to}"
  716. except Exception as e:
  717. logger.error(f"IMAP Forward Error: {e}")
  718. return f"邮件转发过程出错: {str(e)}"
  719. finally:
  720. if mail:
  721. try:
  722. mail.logout()
  723. except: pass
  724. # 在线程池中运行耗时 IO 操作
  725. return await run_in_threadpool(_worker)
  726. @staticmethod
  727. def _append_to_sent(auth, msg: EmailMessage):
  728. """
  729. 同步发件记录到 IMAP Sent 文件夹
  730. """
  731. imap = None
  732. try:
  733. # 确保消息包含必要的指纹,否则同步后 Gmail 搜索不到
  734. if 'Date' not in msg:
  735. msg["Date"] = formatdate(localtime=True)
  736. if 'Message-ID' not in msg:
  737. msg["Message-ID"] = make_msgid(domain=auth.email.split('@')[-1])
  738. imap = EmailAuthorizationService._connect_imap_with_proxy(
  739. auth.imap_server, auth.imap_port,
  740. auth.proxy_host, auth.proxy_port,
  741. auth.proxy_username, auth.proxy_password,
  742. )
  743. imap.login(auth.email, auth.authorization_code)
  744. # --- 自动探测已发送文件夹 (兼容 Gmail/Outlook/域名邮) ---
  745. sent_folder = None
  746. typ, data = imap.list()
  747. if typ == "OK":
  748. for entry in data:
  749. line = entry.decode()
  750. # 寻找包含 \Sent 属性的系统文件夹
  751. if '\\Sent' in line:
  752. # 兼容各种分隔符,提取最后一个引号内的内容
  753. parts = re.findall(r'"([^"]+)"', line)
  754. if parts:
  755. sent_folder = f'"{parts[-1]}"' # 强制带引号防止空格导致 BAD
  756. break
  757. # 兜底逻辑
  758. if not sent_folder:
  759. sent_folder = '"[Gmail]/Sent Mail"' if "gmail" in auth.email.lower() else '"Sent"'
  760. # 执行写入 (使用 \\Seen 标记为已读)
  761. # imap.append 的参数顺序: 文件夹, 标志, 时间, 内容
  762. imap.append(
  763. sent_folder,
  764. '(\\Seen)',
  765. imaplib.Time2Internaldate(time.time()),
  766. msg.as_bytes()
  767. )
  768. logger.info(f"Successfully synced to folder: {sent_folder}")
  769. except Exception as e:
  770. logger.error(f"Append sent mail failed: {str(e)}")
  771. finally:
  772. if imap:
  773. try: imap.logout()
  774. except: pass
  775. @staticmethod
  776. async def send_email(
  777. auth,
  778. send_to: str,
  779. subject: str,
  780. content_type: str,
  781. content: str
  782. ):
  783. def _worker():
  784. msg = EmailMessage()
  785. msg["From"] = auth.email
  786. msg["To"] = send_to
  787. msg["Subject"] = subject
  788. msg["Date"] = formatdate(localtime=True)
  789. msg["Message-ID"] = make_msgid(domain=auth.email.split('@')[-1])
  790. msg["MIME-Version"] = "1.0"
  791. msg["X-Mailer"] = "Python-Client-v1.0"
  792. if content_type.lower() == "html":
  793. msg.set_content("") # 占位
  794. msg.add_alternative(content, subtype="html")
  795. else:
  796. msg.set_content(content)
  797. # 2. 执行发送
  798. logger.info(f"[DEBUG] 准备发送邮件: ID={msg['Message-ID']}")
  799. EmailAuthorizationService.send_email_smtp(auth, msg)
  800. return f"邮件 '{subject}' 成功发送至: {send_to}"
  801. return await run_in_threadpool(_worker)
  802. @staticmethod
  803. async def send_email_bulk(
  804. auth,
  805. send_to: str,
  806. subject: str,
  807. content_type: str,
  808. content: str
  809. ):
  810. def _worker():
  811. bcc_list = [s.strip() for s in send_to.split(",") if s.strip()]
  812. msg = EmailMessage()
  813. msg["From"] = auth.email
  814. msg["To"] = bcc_list[0] if bcc_list else auth.email # Fallback
  815. msg["Subject"] = subject
  816. if content_type.lower() == "html":
  817. msg.set_content("")
  818. msg.add_alternative(content, subtype="html")
  819. else:
  820. msg.set_content(content)
  821. EmailAuthorizationService.send_email_smtp(auth, msg, bcc_list=bcc_list)
  822. return f"邮件 '{subject}' 成功发送至: {send_to}"
  823. return await run_in_threadpool(_worker)
  824. # ----------------------------------------------------------------------
  825. # 底层 SMTP 发送 (保持同步,供 Worker 调用)
  826. # ----------------------------------------------------------------------
  827. @staticmethod
  828. def send_email_smtp(auth, msg, bcc_list=None):
  829. if bcc_list is None:
  830. bcc_list = []
  831. # 这里的 connect 内部已经加了锁,是安全的
  832. mail = EmailAuthorizationService._connect_smtp_with_proxy(
  833. auth.smtp_server,
  834. auth.smtp_port,
  835. auth.proxy_host,
  836. auth.proxy_port,
  837. auth.proxy_username,
  838. auth.proxy_password,
  839. )
  840. try:
  841. mail.login(auth.email, auth.authorization_code)
  842. recipients = bcc_list if bcc_list else [msg["To"]]
  843. mail.send_message(
  844. msg,
  845. from_addr=auth.email,
  846. to_addrs=recipients
  847. )
  848. logger.info(f"[DEBUG] 开始同步到已发送文件夹...")
  849. EmailAuthorizationService._append_to_sent(auth, msg)
  850. finally:
  851. mail.quit()
  852. @staticmethod
  853. def _extract_body(msg, only_text: bool = True) -> str:
  854. # 纯 CPU 计算,不需要 async,保留原样
  855. body_parts = []
  856. if msg.is_multipart():
  857. for part in msg.walk():
  858. ctype = part.get_content_type()
  859. if only_text and ctype != "text/plain":
  860. continue
  861. if not only_text and ctype not in ["text/plain", "text/html"]:
  862. continue
  863. if part.get("Content-Disposition"):
  864. continue
  865. charset = part.get_content_charset() or "utf-8"
  866. try:
  867. payload = part.get_payload(decode=True)
  868. if payload:
  869. text = payload.decode(charset, errors="ignore")
  870. body_parts.append(text)
  871. except Exception:
  872. continue
  873. else:
  874. charset = msg.get_content_charset() or "utf-8"
  875. payload = msg.get_payload(decode=True)
  876. if payload:
  877. body_parts.append(payload.decode(charset, errors="ignore"))
  878. body = "\n".join(body_parts)
  879. return re.sub(r"\s+", " ", body.strip())