| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- import re
- import json
- from datetime import datetime, timedelta
- from typing import List, Optional
- from decimal import Decimal
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.core.biz_exception import NotFoundError, BizLogicError
- from app.models.order import VasOrder
- from app.models.vas_task import VasTask
- from app.models.product_routing import VasProductRouting
- from app.models.payment_event import VasPaymentEvent
- from app.models.payment import VasPayment
- from app.models.payment_qr import VasPaymentQR
- from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
- class WebhookService:
- # =========================================================
- # 内部方法:创建 Task(幂等)
- # =========================================================
- @staticmethod
- async def _create_task_if_not_exists(
- db: AsyncSession,
- order: VasOrder,
- ) -> List[VasTask]:
- # 1. 查询路由配置
- stmt = select(VasProductRouting).where(
- VasProductRouting.product_id == order.product_id,
- VasProductRouting.is_active == 1,
- )
- result = await db.execute(stmt)
- routings = result.scalars().all()
- if not routings:
- return []
- created_tasks: List[VasTask] = []
- # --- 别名邮箱生成逻辑开始 ---
- # 提取原始邮箱用户名
- user_inputs = order.user_inputs or {}
- original_email = str(user_inputs.get("email", "user")).strip()
- base_user = original_email.split('@')[0] if '@' in original_email else original_email
- if not base_user:
- base_user = "user"
- # 计算稳定的域名索引 (基于原始用户名)
- DOMAINS = [
- "gmail-app.com",
- "outlooksearch.com",
- "hotmails.vip",
- "gmail365.cc",
- "ymails.top",
- "teamymail.cfd"
- ]
-
- h = 0
- for ch in base_user:
- h = (31 * h + ord(ch)) & 0xFFFFFFFF
- if h & 0x80000000: # 模拟 Java 有符号 32 位 int
- h = -((~h + 1) & 0xFFFFFFFF)
-
- domain_index = abs(h) % len(DOMAINS)
- selected_domain = DOMAINS[domain_index]
- # 提取 Order ID 后两位作为唯一后缀
- order_id_str = str(order.id)
- suffix = order_id_str[-2:] if len(order_id_str) >= 2 else order_id_str
-
- # 组合最终别名
- final_alias_email = f"{base_user}{suffix}@{selected_domain}"
- # --- 别名邮箱生成逻辑结束 ---
- for routing in routings:
- # 幂等检查
- exists_stmt = select(VasTask).where(
- VasTask.order_id == order.id,
- VasTask.routing_key == routing.routing_key,
- VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
- )
- exists_result = await db.execute(exists_stmt)
- exists = exists_result.scalar_one_or_none()
- if exists:
- continue
- # 构造任务配置,合并路由默认配置与动态生成的 alias_email
- # 如果 routing.config 已经有内容,我们覆盖其中的关键字段
- task_config = (routing.config or {}).copy()
- task_config.update({
- "alias_email": final_alias_email,
- "include_today": True,
- "include_tomorrow": True,
- "time_filter": ["AM", "PM"],
- "exclude_dates": [],
- "allowed_weekdays": [1, 2, 3, 4, 5],
- "select": "random"
- })
- task = VasTask(
- order_id=order.id,
- routing_key=routing.routing_key,
- script_version=routing.script_version,
- priority=routing.priority,
- status="pending",
- user_inputs=order.user_inputs,
- config=task_config, # 使用新生成的配置
- attempt_count=0,
- notify_count=0,
- expire_at=datetime.utcnow() + timedelta(days=60),
- created_at=datetime.utcnow(),
- )
- db.add(task)
- await db.flush()
- await db.refresh(task)
- created_tasks.append(task)
-
- return created_tasks
- # =========================================================
- # SMSHelper 微信 / 支付宝 收款 webhook
- # =========================================================
- @staticmethod
- async def smshelper_payment_webhook(
- db: AsyncSession,
- payload: SMSHelperWebhookPayload,
- ) -> Optional[PaymentWebhookOut]:
- title = payload.title
- content = payload.content
- if "微信" in title:
- provider = "wechat"
- elif "支付宝" in title:
- provider = "alipay"
- else:
- raise BizLogicError("Unknown payment provider")
- device_match = re.search(r"【(.+?)】", content)
- device_id = device_match.group(1) if device_match else None
- amount_match = re.search(r"¥([\d.]+)", content)
- if not amount_match:
- raise BizLogicError("Amount not found in webhook content")
- amount_yuan = Decimal(amount_match.group(1))
- amount_cent = int(amount_yuan * 100)
- # 修正点:实例化 Event 时字段名需匹配模型
- event = VasPaymentEvent(
- provider=provider,
- event_type="payment_received",
- title=title,
- content=content,
- parsed_amount=amount_cent,
- parsed_currency="CNY",
- parsed_device=device_id,
- raw_payload=payload.dict(), # 对应模型 raw_payload
- status="received",
- )
- db.add(event)
- await db.commit()
- await db.refresh(event)
- # ---------- 查找 QR ----------
- qr_stmt = select(VasPaymentQR).where(
- VasPaymentQR.provider == provider,
- VasPaymentQR.device == device_id,
- VasPaymentQR.is_active == 1,
- )
- qr_result = await db.execute(qr_stmt)
- payment_qr = qr_result.scalar_one_or_none()
- if not payment_qr:
- event.status = "failed"
- event.error_message = "QR not found"
- await db.commit()
- raise BizLogicError("QR not found")
- # ---------- 查找 payment ----------
- pay_stmt = (
- select(VasPayment)
- .where(
- VasPayment.provider == provider,
- VasPayment.amount == amount_cent,
- VasPayment.qr_id == payment_qr.id,
- VasPayment.status == "pending",
- )
- .order_by(VasPayment.created_at.desc())
- )
- pay_result = await db.execute(pay_stmt)
- payment = pay_result.scalar_one_or_none()
- if not payment:
- event.status = "failed"
- event.error_message = "No matching pending payment"
- await db.commit()
- raise BizLogicError("Payment not found")
- if payment.status in ("succeeded", "late_paid"):
- event.status = "duplicate"
- event.matched_payment_id = payment.id
- event.matched_order_id = payment.order_id
- await db.commit()
- return None
- now = datetime.utcnow()
- payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
- payment.provider_payload = {
- "title": title,
- "content": content,
- "device_id": device_id,
- "received_at": now.isoformat(),
- }
- order_stmt = select(VasOrder).where(VasOrder.id == payment.order_id)
- order_result = await db.execute(order_stmt)
- order = order_result.scalar_one_or_none()
- if order and order.status != "paid":
- order.status = "paid"
- await WebhookService._create_task_if_not_exists(db, order)
- event.status = "applied"
- event.matched_payment_id = payment.id
- event.matched_order_id = payment.order_id
- await db.commit()
- await db.refresh(payment)
- return PaymentWebhookOut(
- status=True,
- order_id=order.id,
- user_id=order.user_id,
- payment_id=payment.id,
- notify=True,
- )
- # =========================================================
- # Stripe webhook
- # =========================================================
- @staticmethod
- async def stripe_payment_webhook(
- db: AsyncSession,
- event: dict,
- ) -> Optional[PaymentWebhookOut]:
- event_id = event["id"]
- event_type = event["type"]
- data = event["data"]["object"]
- existed_stmt = select(VasPaymentEvent).where(
- VasPaymentEvent.provider == "stripe",
- VasPaymentEvent.event_id == event_id,
- )
- existed_result = await db.execute(existed_stmt)
- if existed_result.scalar_one_or_none():
- return None
- if event_type != "checkout.session.completed":
- db.add(
- VasPaymentEvent(
- provider="stripe",
- event_id=event_id,
- event_type=event_type,
- raw_payload=event, # 修正:payload -> raw_payload
- created_at=datetime.utcnow(),
- status="ignored" # 补充状态,虽有默认值但明确更好
- )
- )
- await db.commit()
- return None
- metadata = data.get("metadata", {})
- payment_id = metadata.get("payment_id")
- order_id = metadata.get("order_id")
- if not payment_id or not order_id:
- raise BizLogicError("Missing payment_id or order_id in metadata")
- pay_stmt = select(VasPayment).where(VasPayment.id == int(payment_id))
- pay_result = await db.execute(pay_stmt)
- payment = pay_result.scalar_one_or_none()
- if not payment:
- raise NotFoundError("Payment not found")
- if payment.status == "succeeded":
- db.add(
- VasPaymentEvent(
- provider="stripe",
- event_id=event_id,
- event_type=event_type,
- matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id
- raw_payload=event, # 补充:记录 raw_payload
- status="duplicate", # 补充:记录状态
- created_at=datetime.utcnow(),
- )
- )
- await db.commit()
- return None
- paid_amount = data["amount_total"]
- paid_currency = data["currency"].upper()
- if paid_amount != payment.amount or paid_currency != payment.currency:
- raise BizLogicError(
- f"Amount mismatch, expected {payment.amount} {payment.currency}, "
- f"got {paid_amount} {paid_currency}"
- )
- now = datetime.utcnow()
- payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
- payment.provider_payload = event
- payment.updated_at = now
- order_stmt = select(VasOrder).where(VasOrder.id == order_id)
- order_result = await db.execute(order_stmt)
- order = order_result.scalar_one_or_none()
- if order and order.status != "paid":
- order.status = "paid"
- order.updated_at = now
- await WebhookService._create_task_if_not_exists(db, order)
- db.add(
- VasPaymentEvent(
- provider="stripe",
- event_id=event_id,
- event_type=event_type,
- matched_payment_id=payment.id, # 修正:payment_id -> matched_payment_id
- matched_order_id=order_id, # 修正:order_id -> matched_order_id
- raw_payload=event, # 修正:payload -> raw_payload
- status="applied", # 明确状态
- created_at=now,
- )
- )
- await db.commit()
- await db.refresh(payment)
- return PaymentWebhookOut(
- status=True,
- order_id=order.id,
- user_id=order.user_id,
- payment_id=payment.id,
- notify=True,
- )
|