| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import aiohttp
- from typing import Dict, Any
- from app.core.biz_exception import BizLogicError
- from app.core.config import settings
- class WechatService:
-
- @staticmethod
- async def _send_webhook(api_token: str, payload: Dict[str, Any]):
- """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
- timeout = aiohttp.ClientTimeout(total=10)
- try:
- async with aiohttp.ClientSession(timeout=timeout) as client:
- async with client.post(url, json=payload) as response:
- if response.status != 200:
- raise BizLogicError(f"Wechat push failed, http_status={response.status}")
- data = await response.json()
- except aiohttp.ClientError as e:
- raise BizLogicError(f"Wechat push request error: {e}")
- if data.get("errcode") != 0:
- raise BizLogicError(
- f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
- )
- return True
- @staticmethod
- async def push_markdown(api_token: str, content: str):
- """发送 Markdown 消息"""
- body = {
- "msgtype": "markdown",
- "markdown": {
- "content": content
- }
- }
- return await WechatService._send_webhook(api_token, body)
- @staticmethod
- async def push_text(api_token: str, content: str):
- """发送 Text 消息"""
- body = {
- "msgtype": "text",
- "text": {
- "content": content
- }
- }
- return await WechatService._send_webhook(api_token, body)
- @staticmethod
- async def push_text_no_token(content: str):
- api_token = settings.wechat_api_token
- if not api_token:
- raise BizLogicError("Wechat api_token missing in settings")
- return await WechatService.push_text(api_token, content)
- @staticmethod
- async def push_markdown_no_token(content: str):
- api_token = settings.wechat_api_token
- if not api_token:
- raise BizLogicError("Wechat api_token missing in settings")
- return await WechatService.push_markdown(api_token, content)
|