import json import time import requests from typing import List from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.schemas.wechat import WechatIn class WechatService: def push_to_wechat(payload: WechatIn): """ 企业微信 WebHook 格式: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY """ url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}" payload = {"msgtype": "text", "text": {"content": payload.message}} response = requests.post(url, json=payload, timeout=10) data = response.json() if response.status_code != 200 or data.get("errcode") != 0: # logger.error(f"企业微信推送失败: {response.text}") raise BizLogicError("Wechat push failed")