import json from sqlalchemy.orm import Session from typing import List, Optional from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.product import VasProduct from app.models.product_routing import VasProductRouting from app.models.payment_event import VasPaymentEvent from app.models.payment import VasPayment from app.models.payment_qr import VasPaymentQR from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut class WebhookService: def create_task_if_not_exists( db: Session, order: VasOrder, routing_key: str, script_version: str, config: dict, ): existing = ( db.query(VasTask) .filter( VasTask.order_id == order.id, VasTask.routing_key == routing_key, VasTask.script_version == script_version, ) .first() ) if existing: return existing # 幂等命中,直接返回 task = VasTask( order_id=order.id, routing_key=routing_key, script_version=script_version, config=config, user_inputs=order.user_inputs, status="pending", priority=10, expire_at=None, ) db.add(task) return task @staticmethod def smshelper_payment_webhook(db: Session, payload: SMSHelperWebhookPayload): """ webhook payload 示例: title=【微信】微信支付 content=【SM-E5260】个人收款码到账¥0.01 """ title = payload.title content = payload.content webhook_out = PaymentWebhookOut() if "微信" in title: provider = "wechat" elif "支付宝" in title: provider = "alipay" device_match = re.search(r"【(.+?)】", content) device_id = device_match.group(1) if device_match else None amount_match = re.search(r"¥([\d.]+)", content) if not amount_match: raise BizLogicError("Amount not found in webhook content") amount_yuan = Decimal(amount_match.group(1)) amount_cent = int(amount_yuan * 100) event = VasPaymentEvent( provider=provider, event_type="payment_received", title=title, content=content, parsed_amount=amount_cent, parsed_currency="CNY", parsed_device=device_id, raw_payload=payload.dict(), status="received" ) db.add(event) db.commit() db.refresh(event) payment_qr = ( db.query(VasPaymentQR) .filter( VasPaymentQR.provider == provider, VasPaymentQR.device == device_id, VasPaymentQR.is_active == 1 ) .first() ) if not payment_qr: event.status = "failed" event.error_message = "QR not found" db.commit() raise BizLogicError("QR not found") payment = ( db.query(VasPayment) .filter( VasPayment.provider == provider, VasPayment.amount == amount_cent, VasPayment.qr_id == payment_qr.id, VasPayment.status == "pending" ) .order_by(VasPayment.created_at.desc()) .first() ) if not payment: event.status = "failed" event.error_message = "No matching pending payment" db.commit() raise BizLogicError("Payment not found") webhook_out.payment_id = payment.id if payment.status in ("succeeded", "late_paid"): event.status = "duplicate" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id db.commit() webhook_out.status = True webhook_out.notify = False return webhook_out now = datetime.utcnow() if payment.expire_at and now > payment.expire_at: payment.status = "late_paid" else: payment.status = "succeeded" _create_task_if_not_exists(db, order) # ---------- 写入原始 payload ---------- payment.provider_payload = { "title": title, "content": content, "device_id": device_id, "received_at": now.isoformat(), } order = db.query(VasOrder).filter(VasOrder.id == payment.order_id).first() if order and order.status != "paid": order.status = "paid" event.status = "applied" event.matched_payment_id = payment.id event.matched_order_id = payment.order_id db.commit() db.refresh(payment) webhook_out.status = True webhook_out.order_id = order.id webhook_out.user_id = order.user_id webhook_out.notify = True return webhook_out @staticmethod def stripe_payment_webhook(db: Session, event): """ Stripe webhook handler """ event_id = event["id"] event_type = event["type"] data = event["data"]["object"] webhook_out = PaymentWebhookOut() # ---------- 1. 幂等(事件级) ---------- existed_event = ( db.query(VasPaymentEvent) .filter(VasPaymentEvent.provider == "stripe") .filter(VasPaymentEvent.event_id == event_id) .first() ) if existed_event: webhook_out.status = True webhook_out.notify = False return webhook_out # ---------- 2. 只处理关心的事件 ---------- if event_type != "checkout.session.completed": db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, payload=event, created_at=datetime.utcnow(), ) ) db.commit() webhook_out.status = True webhook_out.notify = False return webhook_out # ---------- 3. 解析 metadata ---------- metadata = data.get("metadata", {}) payment_id = metadata.get("payment_id") order_id = metadata.get("order_id") if not payment_id or not order_id: raise BizLogicError("Missing payment_id or order_id in metadata") # ---------- 4. 查找 payment(业务级幂等) ---------- payment = ( db.query(VasPayment) .filter(VasPayment.id == int(payment_id)) .first() ) if not payment: raise NotFoundError("Payment not found") if payment.status == "succeeded": # 已处理过 db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, payload=event, payment_id=payment.id, created_at=datetime.utcnow(), ) ) db.commit() webhook_out.status = True webhook_out.notify = False return webhook_out # ---------- 5. 金额校验 ---------- paid_amount = data["amount_total"] # 单位:cent paid_currency = data["currency"].upper() if paid_amount != payment.amount or paid_currency != payment.currency: raise BizLogicError(f"Amount mismatch, expected {payment.amount} {payment.currency}, got {paid_amount} {paid_currency}") # ---------- 6. 判断是否超时 ---------- now = datetime.utcnow() if payment.expire_at and now > payment.expire_at: payment.status = "late_paid" else: payment.status = "succeeded" _create_task_if_not_exists(db, order) payment.provider_payload = event payment.updated_at = now # ---------- 7. 更新 order ---------- order = db.query(VasOrder).filter(VasOrder.id == order_id).first() if order and order.status != "paid": order.status = "paid" order.updated_at = now # ---------- 8. 写 payment_event ---------- db.add( VasPaymentEvent( provider="stripe", event_id=event_id, event_type=event_type, payment_id=payment.id, order_id=order_id, payload=event, created_at=now, ) ) db.commit() webhook_out.status = True webhook_out.order_id = order.id webhook_out.user_id = order.user_id webhook_out.notify = True return webhook_out