wechat_service.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import httpx
  2. from app.core.biz_exception import BizLogicError
  3. from app.schemas.wechat import WechatIn
  4. class WechatService:
  5. @staticmethod
  6. async def push_to_wechat(payload: WechatIn):
  7. """
  8. 企业微信 WebHook 推送(Async 版)
  9. WebHook 格式:
  10. https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
  11. """
  12. url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}"
  13. body = {
  14. "msgtype": "text",
  15. "text": {
  16. "content": payload.message
  17. }
  18. }
  19. try:
  20. async with httpx.AsyncClient(timeout=10) as client:
  21. response = await client.post(url, json=body)
  22. except httpx.RequestError as e:
  23. raise BizLogicError(f"Wechat push request error: {e}")
  24. if response.status_code != 200:
  25. raise BizLogicError(
  26. f"Wechat push failed, http_status={response.status_code}"
  27. )
  28. data = response.json()
  29. if data.get("errcode") != 0:
  30. raise BizLogicError(
  31. f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
  32. )
  33. return True