emails_service.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import re
  2. import asyncio
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy import select, func
  5. from app.core.logger import logger
  6. from app.schemas.order_event import VasOrderEventCreate
  7. from app.services.order_event_service import OrderEventService
  8. from app.services.notification_service import NotificationService
  9. from app.models.order import VasOrder
  10. from app.models.user import VasUser
  11. from app.models.emails import VasEmail
  12. from app.schemas.emails import VasEmailCreate
  13. class EmailsService:
  14. # 关键词黑名单:匹配到这些词的邮件不会创建订单事件,也不会通知用户
  15. SUBJECT_BLACKLIST = [
  16. "验证码",
  17. "动态码",
  18. "verification code",
  19. "otp",
  20. "one-time password",
  21. "security code",
  22. "dynamic code",
  23. "one time password"
  24. ]
  25. SENDER_BLACKLIST = [
  26. "etsy",
  27. "dsw",
  28. ]
  29. @staticmethod
  30. def _is_subject_blacklisted(subject: str) -> bool:
  31. if not subject: return False
  32. lower = subject.lower()
  33. return any(kw.lower() in lower for kw in EmailsService.SUBJECT_BLACKLIST)
  34. @staticmethod
  35. def _is_sender_blacklisted(sender: str) -> bool:
  36. if not sender: return False
  37. lower = sender.lower()
  38. return any(kw.lower() in lower for kw in EmailsService.SENDER_BLACKLIST)
  39. @staticmethod
  40. def _summarize(text: str, limit: int = 120) -> str:
  41. if not text: return ""
  42. # 将多个空白字符替换为一个空格
  43. clean = re.sub(r"\s+", " ", text).strip()
  44. return clean[:limit] + ("..." if len(clean) > limit else "")
  45. @staticmethod
  46. async def create(db: AsyncSession, data: VasEmailCreate) -> VasEmail:
  47. # 1. 检查 UID 是否已存在 (幂等性)
  48. stmt = select(VasEmail).where(VasEmail.uid == data.uid)
  49. existing = (await db.execute(stmt)).scalar_one_or_none()
  50. if existing:
  51. return existing
  52. # 2. 存入邮件记录
  53. rec = VasEmail(
  54. uid=data.uid,
  55. subject=data.subject,
  56. sender=data.sender,
  57. recipient=data.recipient,
  58. receive_time=data.receive_time,
  59. body_text=data.body_text,
  60. )
  61. db.add(rec)
  62. # 3. 拦截、匹配订单并入库通知 (此时还在同一个事务中)
  63. await EmailsService._intercept_and_notify(db, rec)
  64. # 4. 统一提交
  65. await db.commit()
  66. await db.refresh(rec)
  67. return rec
  68. @staticmethod
  69. async def get_max_uid(db: AsyncSession) -> int:
  70. stmt = select(func.max(VasEmail.uid))
  71. return await db.scalar(stmt) or 0
  72. @staticmethod
  73. async def _intercept_and_notify(db: AsyncSession, rec: VasEmail) -> None:
  74. """
  75. 拦截处理逻辑:
  76. 1. 黑名单过滤
  77. 2. 创建订单事件 (OrderEventService)
  78. 3. 获取订单关联的用户联系方式
  79. 4. post_message 到通知表
  80. """
  81. if EmailsService._is_subject_blacklisted(rec.subject or ""):
  82. logger.info(f"Email {rec.uid} skipped: subject '{rec.subject}' is in blacklist.")
  83. return
  84. if EmailsService._is_sender_blacklisted(rec.sender or ""):
  85. logger.info(f"Email {rec.uid} skipped: subject '{rec.sender}' is in blacklist.")
  86. return
  87. # 1. 尝试创建订单事件 (OrderEventService 内部会根据 rec.recipient 匹配订单)
  88. try:
  89. event = await OrderEventService.create(
  90. db,
  91. VasOrderEventCreate(
  92. order_no=None, # 由 OrderEventService 内部填充
  93. alias_email=rec.recipient,
  94. event_title=rec.subject or "Email Update",
  95. event_message=rec.body_text,
  96. email_uid=rec.uid,
  97. event_time=None,
  98. ),
  99. )
  100. except Exception as e:
  101. logger.warning(f"Failed to create order event for email {rec.uid}: {e}")
  102. return
  103. # 2. 如果邮件没能关联到任何活跃订单,则不通知
  104. if not event or not event.order_no:
  105. logger.info(f"Email {rec.uid} logged, but no matching active order found for {rec.recipient}.")
  106. return
  107. # 3. 获取订单及用户信息
  108. stmt = select(VasOrder).where(VasOrder.id == event.order_no)
  109. order = (await db.execute(stmt)).scalar_one_or_none()
  110. if not order:
  111. return
  112. user = await db.get(VasUser, order.user_id)
  113. user_inputs = order.user_inputs or {}
  114. # 优先级:数据库用户字段 > 订单输入的联系方式
  115. email_to = (user.email if user else None) or user_inputs.get("email")
  116. # phone = (user.phone if user else None) or user_inputs.get("phone")
  117. # country_code = str(user_inputs.get("phone_country_code") or "")
  118. # 4. 构造通知内容
  119. summary = EmailsService._summarize(rec.body_text or "")
  120. # --- 邮件转发任务 ---
  121. if email_to:
  122. subject = f"Update: {rec.subject or 'Visa Appointment Progress'}"
  123. await NotificationService.post_message(
  124. db=db,
  125. channel="email",
  126. payload={
  127. "template_id": "order_event_update",
  128. "receiver": email_to,
  129. "payload": {
  130. "subject": subject,
  131. "summary": summary,
  132. "order_no": order.id,
  133. },
  134. },
  135. )
  136. # --- WhatsApp 推送任务 ---
  137. # if phone:
  138. # # 确保国家代码不带 +
  139. # code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
  140. # # 去掉手机号开头的 0
  141. # clean_phone = str(phone).lstrip('0')
  142. # # 组合纯数字部分
  143. # digits = f"{code}{clean_phone}"
  144. # # 再次利用正则确保只剩数字(以防 code 包含特殊字符)
  145. # digits = re.sub(r"\D", "", digits)
  146. # if "@lid" in str(phone):
  147. # chat_id = phone
  148. # elif digits:
  149. # # 使用不带 + 的纯数字拼接后缀
  150. # chat_id = f"{digits}@c.us"
  151. # else:
  152. # return
  153. # message = (
  154. # f"🔔 *Progress Update*\n\n"
  155. # f"We received an update for your appointment:\n"
  156. # f"_{summary}_\n\n"
  157. # f"Please log in to your dashboard or check your email for full details."
  158. # )
  159. # await NotificationService.post_message(
  160. # db=db,
  161. # channel="whatsapp",
  162. # payload={
  163. # "template_id": "order_event_update",
  164. # "receiver": chat_id,
  165. # "payload": {
  166. # "message": message,
  167. # "order_no": order.id,
  168. # },
  169. # },
  170. # )
  171. logger.info(f"Notification tasks queued for Email {rec.uid} (Order: {order.id})")