import httpx from app.core.biz_exception import BizLogicError from app.schemas.wechat import WechatIn class WechatService: @staticmethod async def push_to_wechat(payload: WechatIn): """ 企业微信 WebHook 推送(Async 版) WebHook 格式: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY """ url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}" body = { "msgtype": "text", "text": { "content": payload.message } } try: async with httpx.AsyncClient(timeout=10) as client: response = await client.post(url, json=body) except httpx.RequestError as e: raise BizLogicError(f"Wechat push request error: {e}") if response.status_code != 200: raise BizLogicError( f"Wechat push failed, http_status={response.status_code}" ) data = response.json() if data.get("errcode") != 0: raise BizLogicError( f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}" ) return True