| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import httpx
- from typing import Dict, Any
- from app.core.biz_exception import BizLogicError
- from app.schemas.wechat import WechatIn
- class WechatService:
-
- @staticmethod
- async def _send_webhook(api_token: str, payload: Dict[str, Any]):
- """
- 内部私有方法:发送 HTTP 请求到企业微信 Webhook
- """
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- response = await client.post(url, json=payload)
- except httpx.RequestError as e:
- raise BizLogicError(f"Wechat push request error: {e}")
- if response.status_code != 200:
- raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
- data = response.json()
- if data.get("errcode") != 0:
- raise BizLogicError(
- f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
- )
- return True
- @staticmethod
- async def push_to_wechat(payload: WechatIn):
- """
- [保留原函数兼容] 发送纯文本消息
- """
- body = {
- "msgtype": "text",
- "text": {
- "content": payload.message
- # "mentioned_mobile_list":["13800006789","@all"] # 可选:如需艾特所有人
- }
- }
- return await WechatService._send_webhook(payload.api_token, body)
- @staticmethod
- async def push_markdown(api_token: str, content: str):
- """
- 发送 Markdown 消息 (支持标题、加粗、链接、颜色)
- """
- body = {
- "msgtype": "markdown",
- "markdown": {
- "content": content
- }
- }
- return await WechatService._send_webhook(api_token, body)
- @staticmethod
- async def push_payment_template(api_token: str, data: Dict[str, Any]):
- """
- 专门用于发送【支付确认】的模板消息
- 适配之前的业务逻辑,将其转换为 Webhook 支持的 Markdown 格式
-
- Args:
- api_token: Webhook Key
- data: 包含 order_id, amount, currency, user_email, confirm_url 等字段的字典
- """
-
- # 1. 格式化金额
- amount_fen = data.get('amount', 0)
- amount_str = f"{amount_fen / 100:,.2f}"
- currency = data.get('currency', 'CNY')
-
- # 2. 构造 Markdown 内容
- # <font color="warning"> 红色/橙色
- # <font color="info"> 绿色
- # <font color="comment"> 灰色
- markdown_content = f"""**💰 新增待确认支付**
- > 订单号:<font color="comment">{data.get('order_id', 'N/A')}</font>
- > 用户:<font color="comment">{data.get('user_email', 'Unknown')}</font>
- > 金额:<font color="warning">{amount_str} {currency}</font>
- > 渠道:{data.get('provider', 'Manual')}
- > 时间:{data.get('time_str', '')}
- 请核实资金到账情况。
- 👉 [点击此处进行系统确认]({data.get('confirm_url')})"""
- return await WechatService.push_markdown(api_token, markdown_content)
|