wechat_service.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import aiohttp
  2. from typing import Dict, Any
  3. from app.core.biz_exception import BizLogicError
  4. class WechatService:
  5. @staticmethod
  6. async def _send_webhook(api_token: str, payload: Dict[str, Any]):
  7. """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
  8. url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
  9. timeout = aiohttp.ClientTimeout(total=10)
  10. try:
  11. async with aiohttp.ClientSession(timeout=timeout) as client:
  12. async with client.post(url, json=payload) as response:
  13. if response.status != 200:
  14. raise BizLogicError(f"Wechat push failed, http_status={response.status}")
  15. data = await response.json()
  16. except aiohttp.ClientError as e:
  17. raise BizLogicError(f"Wechat push request error: {e}")
  18. if data.get("errcode") != 0:
  19. raise BizLogicError(
  20. f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
  21. )
  22. return True
  23. @staticmethod
  24. async def push_markdown(api_token: str, content: str):
  25. """发送 Markdown 消息"""
  26. body = {
  27. "msgtype": "markdown",
  28. "markdown": {
  29. "content": content
  30. }
  31. }
  32. return await WechatService._send_webhook(api_token, body)
  33. @staticmethod
  34. async def push_text(api_token: str, content: str):
  35. """发送 Text 消息"""
  36. body = {
  37. "msgtype": "text",
  38. "text": {
  39. "content": content
  40. }
  41. }
  42. return await WechatService._send_webhook(api_token, body)