| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- import httpx
- from app.core.biz_exception import BizLogicError
- from app.schemas.wechat import WechatIn
- class WechatService:
- @staticmethod
- async def push_to_wechat(payload: WechatIn):
- """
- 企业微信 WebHook 推送(Async 版)
- WebHook 格式:
- https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
- """
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}"
- body = {
- "msgtype": "text",
- "text": {
- "content": payload.message
- }
- }
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- response = await client.post(url, json=body)
- except httpx.RequestError as e:
- raise BizLogicError(f"Wechat push request error: {e}")
- if response.status_code != 200:
- raise BizLogicError(
- f"Wechat push failed, http_status={response.status_code}"
- )
- data = response.json()
- if data.get("errcode") != 0:
- raise BizLogicError(
- f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
- )
- return True
|