import re import asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from app.core.logger import logger from app.schemas.order_event import VasOrderEventCreate from app.services.order_event_service import OrderEventService from app.services.notification_service import NotificationService from app.models.order import VasOrder from app.models.user import VasUser from app.models.emails import VasEmail from app.schemas.emails import VasEmailCreate class EmailsService: # 关键词黑名单:匹配到这些词的邮件不会创建订单事件,也不会通知用户 SUBJECT_BLACKLIST = [ "验证码", "动态码", "verification code", "otp", "one-time password", "security code", "dynamic code", "one time password" ] SENDER_BLACKLIST = [ "etsy", "dsw", ] @staticmethod def _is_subject_blacklisted(subject: str) -> bool: if not subject: return False lower = subject.lower() return any(kw.lower() in lower for kw in EmailsService.SUBJECT_BLACKLIST) @staticmethod def _is_sender_blacklisted(sender: str) -> bool: if not sender: return False lower = sender.lower() return any(kw.lower() in lower for kw in EmailsService.SENDER_BLACKLIST) @staticmethod def _summarize(text: str, limit: int = 120) -> str: if not text: return "" # 将多个空白字符替换为一个空格 clean = re.sub(r"\s+", " ", text).strip() return clean[:limit] + ("..." if len(clean) > limit else "") @staticmethod async def create(db: AsyncSession, data: VasEmailCreate) -> VasEmail: # 1. 检查 UID 是否已存在 (幂等性) stmt = select(VasEmail).where(VasEmail.uid == data.uid) existing = (await db.execute(stmt)).scalar_one_or_none() if existing: return existing # 2. 存入邮件记录 rec = VasEmail( uid=data.uid, subject=data.subject, sender=data.sender, recipient=data.recipient, receive_time=data.receive_time, body_text=data.body_text, ) db.add(rec) # 3. 拦截、匹配订单并入库通知 (此时还在同一个事务中) await EmailsService._intercept_and_notify(db, rec) # 4. 统一提交 await db.commit() await db.refresh(rec) return rec @staticmethod async def get_max_uid(db: AsyncSession) -> int: stmt = select(func.max(VasEmail.uid)) return await db.scalar(stmt) or 0 @staticmethod async def _intercept_and_notify(db: AsyncSession, rec: VasEmail) -> None: """ 拦截处理逻辑: 1. 黑名单过滤 2. 创建订单事件 (OrderEventService) 3. 获取订单关联的用户联系方式 4. post_message 到通知表 """ if EmailsService._is_subject_blacklisted(rec.subject or ""): logger.info(f"Email {rec.uid} skipped: subject '{rec.subject}' is in blacklist.") return if EmailsService._is_sender_blacklisted(rec.sender or ""): logger.info(f"Email {rec.uid} skipped: subject '{rec.sender}' is in blacklist.") return # 1. 尝试创建订单事件 (OrderEventService 内部会根据 rec.recipient 匹配订单) try: event = await OrderEventService.create( db, VasOrderEventCreate( order_no=None, # 由 OrderEventService 内部填充 alias_email=rec.recipient, event_title=rec.subject or "Email Update", event_message=rec.body_text, email_uid=rec.uid, event_time=None, ), ) except Exception as e: logger.warning(f"Failed to create order event for email {rec.uid}: {e}") return # 2. 如果邮件没能关联到任何活跃订单,则不通知 if not event or not event.order_no: logger.info(f"Email {rec.uid} logged, but no matching active order found for {rec.recipient}.") return # 3. 获取订单及用户信息 stmt = select(VasOrder).where(VasOrder.id == event.order_no) order = (await db.execute(stmt)).scalar_one_or_none() if not order: return user = await db.get(VasUser, order.user_id) user_inputs = order.user_inputs or {} # 优先级:数据库用户字段 > 订单输入的联系方式 email_to = (user.email if user else None) or user_inputs.get("email") # phone = (user.phone if user else None) or user_inputs.get("phone") # country_code = str(user_inputs.get("phone_country_code") or "") # 4. 构造通知内容 summary = EmailsService._summarize(rec.body_text or "") # --- 邮件转发任务 --- if email_to: subject = f"Update: {rec.subject or 'Visa Appointment Progress'}" await NotificationService.post_message( db=db, channel="email", payload={ "template_id": "order_event_update", "receiver": email_to, "payload": { "subject": subject, "summary": summary, "order_no": order.id, }, }, ) # --- WhatsApp 推送任务 --- # if phone: # # 确保国家代码不带 + # code = str(user_inputs.get("phone_country_code") or "").replace("+", "") # # 去掉手机号开头的 0 # clean_phone = str(phone).lstrip('0') # # 组合纯数字部分 # digits = f"{code}{clean_phone}" # # 再次利用正则确保只剩数字(以防 code 包含特殊字符) # digits = re.sub(r"\D", "", digits) # if "@lid" in str(phone): # chat_id = phone # elif digits: # # 使用不带 + 的纯数字拼接后缀 # chat_id = f"{digits}@c.us" # else: # return # message = ( # f"🔔 *Progress Update*\n\n" # f"We received an update for your appointment:\n" # f"_{summary}_\n\n" # f"Please log in to your dashboard or check your email for full details." # ) # await NotificationService.post_message( # db=db, # channel="whatsapp", # payload={ # "template_id": "order_event_update", # "receiver": chat_id, # "payload": { # "message": message, # "order_no": order.id, # }, # }, # ) logger.info(f"Notification tasks queued for Email {rec.uid} (Order: {order.id})")