import aiohttp from typing import Dict, Any from app.core.biz_exception import BizLogicError class WechatService: @staticmethod async def _send_webhook(api_token: str, payload: Dict[str, Any]): """内部私有方法:发送 HTTP 请求到企业微信 Webhook""" url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}" timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as client: async with client.post(url, json=payload) as response: if response.status != 200: raise BizLogicError(f"Wechat push failed, http_status={response.status}") data = await response.json() except aiohttp.ClientError as e: raise BizLogicError(f"Wechat push request error: {e}") if data.get("errcode") != 0: raise BizLogicError( f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}" ) return True @staticmethod async def push_markdown(api_token: str, content: str): """发送 Markdown 消息""" body = { "msgtype": "markdown", "markdown": { "content": content } } return await WechatService._send_webhook(api_token, body) @staticmethod async def push_text(api_token: str, content: str): """发送 Text 消息""" body = { "msgtype": "text", "text": { "content": content } } return await WechatService._send_webhook(api_token, body)