wechat_service.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import aiohttp
  2. from typing import Dict, Any
  3. from app.core.biz_exception import BizLogicError
  4. from app.core.config import settings
  5. class WechatService:
  6. @staticmethod
  7. async def _send_webhook(api_token: str, payload: Dict[str, Any]):
  8. """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
  9. url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
  10. timeout = aiohttp.ClientTimeout(total=10)
  11. try:
  12. async with aiohttp.ClientSession(timeout=timeout) as client:
  13. async with client.post(url, json=payload) as response:
  14. if response.status != 200:
  15. raise BizLogicError(f"Wechat push failed, http_status={response.status}")
  16. data = await response.json()
  17. except aiohttp.ClientError as e:
  18. raise BizLogicError(f"Wechat push request error: {e}")
  19. if data.get("errcode") != 0:
  20. raise BizLogicError(
  21. f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
  22. )
  23. return True
  24. @staticmethod
  25. async def push_markdown(api_token: str, content: str):
  26. """发送 Markdown 消息"""
  27. body = {
  28. "msgtype": "markdown",
  29. "markdown": {
  30. "content": content
  31. }
  32. }
  33. return await WechatService._send_webhook(api_token, body)
  34. @staticmethod
  35. async def push_text(api_token: str, content: str):
  36. """发送 Text 消息"""
  37. body = {
  38. "msgtype": "text",
  39. "text": {
  40. "content": content
  41. }
  42. }
  43. return await WechatService._send_webhook(api_token, body)
  44. @staticmethod
  45. async def push_text_no_token(content: str):
  46. api_token = settings.wechat_api_token
  47. if not api_token:
  48. raise BizLogicError("Wechat api_token missing in settings")
  49. return await WechatService.push_text(api_token, content)
  50. @staticmethod
  51. async def push_markdown_no_token(content: str):
  52. api_token = settings.wechat_api_token
  53. if not api_token:
  54. raise BizLogicError("Wechat api_token missing in settings")
  55. return await WechatService.push_markdown(api_token, content)