| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import re
- import asyncio
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, func
- from app.core.logger import logger
- from app.schemas.order_event import VasOrderEventCreate
- from app.services.order_event_service import OrderEventService
- from app.services.notification_service import NotificationService
- from app.models.order import VasOrder
- from app.models.user import VasUser
- from app.models.emails import VasEmail
- from app.schemas.emails import VasEmailCreate
- class EmailsService:
- # 关键词黑名单:匹配到这些词的邮件不会创建订单事件,也不会通知用户
- SUBJECT_BLACKLIST = [
- "验证码",
- "动态码",
- "verification code",
- "otp",
- "one-time password",
- "security code",
- "dynamic code",
- "one time password"
- ]
-
- SENDER_BLACKLIST = [
- "etsy",
- "dsw",
- ]
- @staticmethod
- def _is_subject_blacklisted(subject: str) -> bool:
- if not subject: return False
- lower = subject.lower()
- return any(kw.lower() in lower for kw in EmailsService.SUBJECT_BLACKLIST)
-
- @staticmethod
- def _is_sender_blacklisted(sender: str) -> bool:
- if not sender: return False
- lower = sender.lower()
- return any(kw.lower() in lower for kw in EmailsService.SENDER_BLACKLIST)
- @staticmethod
- def _summarize(text: str, limit: int = 120) -> str:
- if not text: return ""
- # 将多个空白字符替换为一个空格
- clean = re.sub(r"\s+", " ", text).strip()
- return clean[:limit] + ("..." if len(clean) > limit else "")
- @staticmethod
- async def create(db: AsyncSession, data: VasEmailCreate) -> VasEmail:
- # 1. 检查 UID 是否已存在 (幂等性)
- stmt = select(VasEmail).where(VasEmail.uid == data.uid)
- existing = (await db.execute(stmt)).scalar_one_or_none()
- if existing:
- return existing
- # 2. 存入邮件记录
- rec = VasEmail(
- uid=data.uid,
- subject=data.subject,
- sender=data.sender,
- recipient=data.recipient,
- receive_time=data.receive_time,
- body_text=data.body_text,
- )
- db.add(rec)
-
- # 3. 拦截、匹配订单并入库通知 (此时还在同一个事务中)
- await EmailsService._intercept_and_notify(db, rec)
-
- # 4. 统一提交
- await db.commit()
- await db.refresh(rec)
-
- return rec
- @staticmethod
- async def get_max_uid(db: AsyncSession) -> int:
- stmt = select(func.max(VasEmail.uid))
- return await db.scalar(stmt) or 0
- @staticmethod
- async def _intercept_and_notify(db: AsyncSession, rec: VasEmail) -> None:
- """
- 拦截处理逻辑:
- 1. 黑名单过滤
- 2. 创建订单事件 (OrderEventService)
- 3. 获取订单关联的用户联系方式
- 4. post_message 到通知表
- """
- if EmailsService._is_subject_blacklisted(rec.subject or ""):
- logger.info(f"Email {rec.uid} skipped: subject '{rec.subject}' is in blacklist.")
- return
-
- if EmailsService._is_sender_blacklisted(rec.sender or ""):
- logger.info(f"Email {rec.uid} skipped: subject '{rec.sender}' is in blacklist.")
- return
- # 1. 尝试创建订单事件 (OrderEventService 内部会根据 rec.recipient 匹配订单)
- try:
- event = await OrderEventService.create(
- db,
- VasOrderEventCreate(
- order_no=None, # 由 OrderEventService 内部填充
- alias_email=rec.recipient,
- event_title=rec.subject or "Email Update",
- event_message=rec.body_text,
- email_uid=rec.uid,
- event_time=None,
- ),
- )
- except Exception as e:
- logger.warning(f"Failed to create order event for email {rec.uid}: {e}")
- return
- # 2. 如果邮件没能关联到任何活跃订单,则不通知
- if not event or not event.order_no:
- logger.info(f"Email {rec.uid} logged, but no matching active order found for {rec.recipient}.")
- return
- # 3. 获取订单及用户信息
- stmt = select(VasOrder).where(VasOrder.id == event.order_no)
- order = (await db.execute(stmt)).scalar_one_or_none()
- if not order:
- return
- user = await db.get(VasUser, order.user_id)
- user_inputs = order.user_inputs or {}
- # 优先级:数据库用户字段 > 订单输入的联系方式
- email_to = (user.email if user else None) or user_inputs.get("email")
- # phone = (user.phone if user else None) or user_inputs.get("phone")
- # country_code = str(user_inputs.get("phone_country_code") or "")
- # 4. 构造通知内容
- summary = EmailsService._summarize(rec.body_text or "")
-
- # --- 邮件转发任务 ---
- if email_to:
- subject = f"Update: {rec.subject or 'Visa Appointment Progress'}"
- await NotificationService.post_message(
- db=db,
- channel="email",
- payload={
- "template_id": "order_event_update",
- "receiver": email_to,
- "payload": {
- "subject": subject,
- "summary": summary,
- "order_no": order.id,
- },
- },
- )
- # --- WhatsApp 推送任务 ---
- # if phone:
- # # 确保国家代码不带 +
- # code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
- # # 去掉手机号开头的 0
- # clean_phone = str(phone).lstrip('0')
-
- # # 组合纯数字部分
- # digits = f"{code}{clean_phone}"
- # # 再次利用正则确保只剩数字(以防 code 包含特殊字符)
- # digits = re.sub(r"\D", "", digits)
- # if "@lid" in str(phone):
- # chat_id = phone
- # elif digits:
- # # 使用不带 + 的纯数字拼接后缀
- # chat_id = f"{digits}@c.us"
- # else:
- # return
- # message = (
- # f"🔔 *Progress Update*\n\n"
- # f"We received an update for your appointment:\n"
- # f"_{summary}_\n\n"
- # f"Please log in to your dashboard or check your email for full details."
- # )
- # await NotificationService.post_message(
- # db=db,
- # channel="whatsapp",
- # payload={
- # "template_id": "order_event_update",
- # "receiver": chat_id,
- # "payload": {
- # "message": message,
- # "order_no": order.id,
- # },
- # },
- # )
-
- logger.info(f"Notification tasks queued for Email {rec.uid} (Order: {order.id})")
|