| 1234567891011121314151617181920212223242526 |
- import json
- import time
- import requests
- from typing import List
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.schemas.wechat import WechatIn
- class WechatService:
- def push_to_wechat(payload: WechatIn):
- """
- 企业微信 WebHook 格式:
- https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
- """
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}"
- payload = {"msgtype": "text", "text": {"content": payload.message}}
- response = requests.post(url, json=payload, timeout=10)
- data = response.json()
- if response.status_code != 200 or data.get("errcode") != 0:
- # logger.error(f"企业微信推送失败: {response.text}")
- raise BizLogicError("Wechat push failed")
-
-
|