emails_service.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import re
  2. import asyncio
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy import select, func
  5. from app.core.logger import logger
  6. from app.schemas.order_event import VasOrderEventCreate
  7. from app.services.order_event_service import OrderEventService
  8. from app.services.notification_service import NotificationService
  9. from app.models.order import VasOrder
  10. from app.models.user import VasUser
  11. from app.models.emails import VasEmail
  12. from app.schemas.emails import VasEmailCreate
  13. class EmailsService:
  14. # 关键词黑名单:匹配到这些词的邮件不会创建订单事件,也不会通知用户
  15. SUBJECT_BLACKLIST = [
  16. "验证码",
  17. "动态码",
  18. "verification code",
  19. "otp",
  20. "one-time password",
  21. "security code",
  22. "dynamic code",
  23. "one time password"
  24. ]
  25. @staticmethod
  26. def _is_subject_blacklisted(subject: str) -> bool:
  27. if not subject: return False
  28. lower = subject.lower()
  29. return any(kw.lower() in lower for kw in EmailsService.SUBJECT_BLACKLIST)
  30. @staticmethod
  31. def _summarize(text: str, limit: int = 120) -> str:
  32. if not text: return ""
  33. # 将多个空白字符替换为一个空格
  34. clean = re.sub(r"\s+", " ", text).strip()
  35. return clean[:limit] + ("..." if len(clean) > limit else "")
  36. @staticmethod
  37. async def create(db: AsyncSession, data: VasEmailCreate) -> VasEmail:
  38. # 1. 检查 UID 是否已存在 (幂等性)
  39. stmt = select(VasEmail).where(VasEmail.uid == data.uid)
  40. existing = (await db.execute(stmt)).scalar_one_or_none()
  41. if existing:
  42. return existing
  43. # 2. 存入邮件记录
  44. rec = VasEmail(
  45. uid=data.uid,
  46. subject=data.subject,
  47. sender=data.sender,
  48. recipient=data.recipient,
  49. receive_time=data.receive_time,
  50. body_text=data.body_text,
  51. )
  52. db.add(rec)
  53. # 3. 拦截、匹配订单并入库通知 (此时还在同一个事务中)
  54. await EmailsService._intercept_and_notify(db, rec)
  55. # 4. 统一提交
  56. await db.commit()
  57. await db.refresh(rec)
  58. return rec
  59. @staticmethod
  60. async def get_max_uid(db: AsyncSession) -> int:
  61. stmt = select(func.max(VasEmail.uid))
  62. return await db.scalar(stmt) or 0
  63. @staticmethod
  64. async def _intercept_and_notify(db: AsyncSession, rec: VasEmail) -> None:
  65. """
  66. 拦截处理逻辑:
  67. 1. 黑名单过滤
  68. 2. 创建订单事件 (OrderEventService)
  69. 3. 获取订单关联的用户联系方式
  70. 4. post_message 到通知表
  71. """
  72. if EmailsService._is_subject_blacklisted(rec.subject or ""):
  73. logger.info(f"Email {rec.uid} skipped: subject '{rec.subject}' is in blacklist.")
  74. return
  75. # 1. 尝试创建订单事件 (OrderEventService 内部会根据 rec.recipient 匹配订单)
  76. try:
  77. event = await OrderEventService.create(
  78. db,
  79. VasOrderEventCreate(
  80. order_no=None, # 由 OrderEventService 内部填充
  81. alias_email=rec.recipient,
  82. event_title=rec.subject or "Email Update",
  83. event_message=rec.body_text,
  84. email_uid=rec.uid,
  85. event_time=None,
  86. ),
  87. )
  88. except Exception as e:
  89. logger.warning(f"Failed to create order event for email {rec.uid}: {e}")
  90. return
  91. # 2. 如果邮件没能关联到任何活跃订单,则不通知
  92. if not event or not event.order_no:
  93. logger.info(f"Email {rec.uid} logged, but no matching active order found for {rec.recipient}.")
  94. return
  95. # 3. 获取订单及用户信息
  96. stmt = select(VasOrder).where(VasOrder.id == event.order_no)
  97. order = (await db.execute(stmt)).scalar_one_or_none()
  98. if not order:
  99. return
  100. user = await db.get(VasUser, order.user_id)
  101. user_inputs = order.user_inputs or {}
  102. # 优先级:数据库用户字段 > 订单输入的联系方式
  103. email_to = (user.email if user else None) or user_inputs.get("email")
  104. phone = (user.phone if user else None) or user_inputs.get("phone")
  105. country_code = str(user_inputs.get("phone_country_code") or "")
  106. # 4. 构造通知内容
  107. summary = EmailsService._summarize(rec.body_text or "")
  108. # --- 邮件转发任务 ---
  109. if email_to:
  110. subject = f"Update: {rec.subject or 'Visa Appointment Progress'}"
  111. await NotificationService.post_message(
  112. db=db,
  113. channel="email",
  114. payload={
  115. "template_id": "order_event_update",
  116. "receiver": email_to,
  117. "payload": {
  118. "subject": subject,
  119. "summary": summary,
  120. "order_no": order.id,
  121. },
  122. },
  123. )
  124. # --- WhatsApp 推送任务 ---
  125. if phone:
  126. # 确保国家代码不带 +
  127. code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
  128. # 去掉手机号开头的 0
  129. clean_phone = str(phone).lstrip('0')
  130. # 组合纯数字部分
  131. digits = f"{code}{clean_phone}"
  132. # 再次利用正则确保只剩数字(以防 code 包含特殊字符)
  133. digits = re.sub(r"\D", "", digits)
  134. if "@lid" in str(phone):
  135. chat_id = phone
  136. elif digits:
  137. # 使用不带 + 的纯数字拼接后缀
  138. chat_id = f"{digits}@c.us"
  139. else:
  140. return
  141. message = (
  142. f"🔔 *Progress Update*\n\n"
  143. f"We received an update for your appointment:\n"
  144. f"_{summary}_\n\n"
  145. f"Please log in to your dashboard or check your email for full details."
  146. )
  147. await NotificationService.post_message(
  148. db=db,
  149. channel="whatsapp",
  150. payload={
  151. "template_id": "order_event_update",
  152. "receiver": chat_id,
  153. "payload": {
  154. "message": message,
  155. "order_no": order.id,
  156. },
  157. },
  158. )
  159. logger.info(f"Notification tasks queued for Email {rec.uid} (Order: {order.id})")