import re import json from datetime import datetime, timedelta from typing import List, Optional from decimal import Decimal from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.biz_exception import NotFoundError, BizLogicError from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.product_routing import VasProductRouting from app.models.payment_event import VasPaymentEvent from app.models.payment import VasPayment from app.models.payment_qr import VasPaymentQR from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut class WebhookService: # ========================================================= # 内部方法:创建 Task(幂等) # ========================================================= @staticmethod async def _create_task_if_not_exists( db: AsyncSession, order: VasOrder, ) -> List[VasTask]: # 1. 查询路由配置 stmt = select(VasProductRouting).where( VasProductRouting.product_id == order.product_id, VasProductRouting.is_active == 1, ) result = await db.execute(stmt) routings = result.scalars().all() if not routings: return [] created_tasks: List[VasTask] = [] # --- 别名邮箱生成逻辑开始 --- # 提取原始邮箱用户名 user_inputs = order.user_inputs or {} original_email = str(user_inputs.get("email", "user")).strip() base_user = original_email.split('@')[0] if '@' in original_email else original_email if not base_user: base_user = "user" # 计算稳定的域名索引 (基于原始用户名) DOMAINS = [ "gmail-app.com", "outlooksearch.com", "hotmails.vip", "gmail365.cc", "ymails.top", "teamymail.cfd" ] h = 0 for ch in base_user: h = (31 * h + ord(ch)) & 0xFFFFFFFF if h & 0x80000000: # 模拟 Java 有符号 32 位 int h = -((~h + 1) & 0xFFFFFFFF) domain_index = abs(h) % len(DOMAINS) selected_domain = DOMAINS[domain_index] # 提取 Order ID 后两位作为唯一后缀 order_id_str = str(order.id) suffix = order_id_str[-2:] if len(order_id_str) >= 2 else order_id_str # 组合最终别名 final_alias_email = f"{base_user}{suffix}@{selected_domain}" # --- 别名邮箱生成逻辑结束 --- for routing in routings: # 幂等检查 exists_stmt = select(VasTask).where( VasTask.order_id == order.id, VasTask.routing_key == routing.routing_key, VasTask.status.in_(["pending", "grabbed", "running", "completed"]), ) exists_result = await db.execute(exists_stmt) exists = exists_result.scalar_one_or_none() if exists: continue # 构造任务配置,合并路由默认配置与动态生成的 alias_email # 如果 routing.config 已经有内容,我们覆盖其中的关键字段 task_config = (routing.config or {}).copy() task_config.update({ "alias_email": final_alias_email, "include_today": True, "include_tomorrow": True, "time_filter": ["AM", "PM"], "exclude_dates": [], "allowed_weekdays": [1, 2, 3, 4, 5], "select": "random" }) task = VasTask( order_id=order.id, routing_key=routing.routing_key, script_version=routing.script_version, priority=routing.priority, status="pending", user_inputs=order.user_inputs, config=task_config, # 使用新生成的配置 attempt_count=0, notify_count=0, expire_at=datetime.utcnow() + timedelta(days=60), created_at=datetime.utcnow(), ) db.add(task) await db.flush() await db.refresh(task) created_tasks.append(task) return created_tasks # ========================================================= # SMSHelper 微信 / 支付宝 收款 webhook # ========================================================= @staticmethod async def smshelper_payment_webhook( db: AsyncSession, payload: SMSHelperWebhookPayload, ) -> Optional[PaymentWebhookOut]: title = payload.title content = payload.content if "微信" in title: provider = "wechat" elif "支付宝" in title: provider = "alipay" else: raise BizLogicError("Unknown payment provider") device_match = re.search(r"【(.+?)】", content) device_id = device_match.group(1) if device_match else None amount_match = re.search(r"¥([\d.]+)", content) if not amount_match: raise BizLogicError("Amount not found in webhook content") amount_yuan = Decimal(amount_match.group(1)) amount_cent = int(amount_yuan * 100) # 修正点:实例化 Event 时字段名需匹配模型 event = VasPaymentEvent( provider=provider, event_type="payment_received", title=title, content=content, parsed_amount=amount_cent, parsed_currency="CNY", parsed_device=device_id, raw_payload=payload.dict(), # 对应模型 raw_payload status="received", ) db.add(event) await db.commit() await db.refresh(event) # ---------- 查找 QR ---------- qr_stmt = select(VasPaymentQR).where( VasPaymentQR.provider == provider, VasPaymentQR.device == device_id, VasPaymentQR.is_active == 1, ) qr_result = await db.execute(qr_stmt) payment_qr = qr_result.scalar_one_or_none() if not payment_qr: event.status = "failed" event.error_message = "QR not found" await db.commit() raise BizLogicError("QR not found") # ---------- 查找 payment ---------- pay_stmt = ( select(VasPayment) .where( VasPayment.provider == provider, VasPayment.amount == amount_cent, VasPayment.qr_id == payment_qr.id, VasPayment.status == "pending", ) .order_by(VasPayment.created_at.desc()) ) pay_result = await db.execute(pay_stmt) payment = pay_result.scalar_one_or_none() if not payment: event.status = "failed" event.error_message = "No matching pending payment" await db.commit() raise BizLogicError("Payment not found") if payment.status in ("succeeded", "late_paid"): event.status = "duplicate" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() return None now = datetime.utcnow() payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded" payment.provider_payload = { "title": title, "content": content, "device_id": device_id, "received_at": now.isoformat(), } order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id) order_result = await db.execute(order_stmt) order = order_result.scalar_one_or_none() if order and order.status != "paid": order.status = "paid" await WebhookService._create_task_if_not_exists(db, order) event.status = "applied" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id await db.commit() await db.refresh(payment) return PaymentWebhookOut( status=True, order_id=order.id, user_id=order.user_id, payment_id=payment.id, notify=True, ) # ========================================================= # Stripe webhook # ========================================================= @staticmethod async def stripe_payment_webhook( db: AsyncSession, event: dict, ) -> Optional[PaymentWebhookOut]: event_id = event["id"] event_type = event["type"] data = event["data"]["object"] existed_stmt = select(VasPaymentEvent).where( VasPaymentEvent.provider == "stripe", VasPaymentEvent.event_id == event_id, ) existed_result = await db.execute(existed_stmt) if existed_result.scalar_one_or_none(): return None if event_type != "checkout.session.completed": db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, raw_payload=event, # 修正:payload -> raw_payload created_at=datetime.utcnow(), status="ignored" # 补充状态,虽有默认值但明确更好 ) ) await db.commit() return None metadata = data.get("metadata", {}) payment_id = metadata.get("payment_id") order_id = metadata.get("order_id") if not payment_id or not order_id: raise BizLogicError("Missing payment_id or order_id in metadata") pay_stmt = select(VasPayment).where(VasPayment.id == int(payment_id)) pay_result = await db.execute(pay_stmt) payment = pay_result.scalar_one_or_none() if not payment: raise NotFoundError("Payment not found") if payment.status == "succeeded": db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id raw_payload=event, # 补充:记录 raw_payload status="duplicate", # 补充:记录状态 created_at=datetime.utcnow(), ) ) await db.commit() return None paid_amount = data["amount_total"] paid_currency = data["currency"].upper() if paid_amount != payment.amount or paid_currency != payment.currency: raise BizLogicError( f"Amount mismatch, expected {payment.amount} {payment.currency}, " f"got {paid_amount} {paid_currency}" ) now = datetime.utcnow() payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded" payment.provider_payload = event payment.updated_at = now order_stmt = select(VasOrder).where(VasOrder.id == order_id) order_result = await db.execute(order_stmt) order = order_result.scalar_one_or_none() if order and order.status != "paid": order.status = "paid" order.updated_at = now await WebhookService._create_task_if_not_exists(db, order) db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id matched_order_id=order_id, # 修正:order_id -> matched_order_id raw_payload=event, # 修正:payload -> raw_payload status="applied", # 明确状态 created_at=now, ) ) await db.commit() await db.refresh(payment) return PaymentWebhookOut( status=True, order_id=order.id, user_id=order.user_id, payment_id=payment.id, notify=True, )