wechat_service.py 858 B

1234567891011121314151617181920212223242526
  1. import json
  2. import time
  3. import requests
  4. from typing import List
  5. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  6. from app.schemas.wechat import WechatIn
  7. class WechatService:
  8. def push_to_wechat(payload: WechatIn):
  9. """
  10. 企业微信 WebHook 格式:
  11. https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
  12. """
  13. url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}"
  14. payload = {"msgtype": "text", "text": {"content": payload.message}}
  15. response = requests.post(url, json=payload, timeout=10)
  16. data = response.json()
  17. if response.status_code != 200 or data.get("errcode") != 0:
  18. # logger.error(f"企业微信推送失败: {response.text}")
  19. raise BizLogicError("Wechat push failed")