import httpx
from typing import Dict, Any, List, Union
from datetime import datetime
from app.core.biz_exception import BizLogicError
from app.schemas.wechat import WechatIn
def _parse_slots_summary(availability_data):
"""
availability:
[
{ "date": "2025-01-05", "times": [] },
{ "date": "2025-01-06", "times": [{"time": "09:30"}] }
]
"""
if not availability_data:
return "No specific slots data."
if not isinstance(availability_data, list):
return str(availability_data)
summaries = []
# 限制显示天数,防止消息过长
display_limit = 4
for day in availability_data[:display_limit]:
date_str = day.get("date")
times = day.get("times", [])
if not times:
# 修改点:即使没有具体时间,只要有日期条目,也显示为可用
summaries.append(f"{date_str}")
else:
# 有具体时间的处理
time_list = [t.get("time", "") for t in times[:5]]
time_text = ", ".join(filter(None, time_list)) # 过滤空时间
if len(times) > 5:
time_text += f" (+{len(times) - 5})"
summaries.append(f"{date_str}: {time_text}")
if len(availability_data) > display_limit:
summaries.append(f"(+{len(availability_data) - display_limit} more days)")
return " | ".join(summaries)
def _get_display_meta(slot_snapshot):
"""
Return dynamic header logic based on status.
"""
# 修复:确保这里也调用增强后的日期格式化
date_str = slot_snapshot.get("earliest_date")
status = slot_snapshot.get("availability_status")
country = slot_snapshot.get("country", "Unknown")
city = slot_snapshot.get("city", "Unknown")
if status == 'Available':
emoji = "🟢"
headline = f'{country}, {city}: {date_str}'
color = "info" # Green
elif status == 'Waitlist':
emoji = "🟡"
headline = f'Waitlist: {country}, {city}'
color = "warning" # Orange
else:
emoji = "🔴"
headline = f'No Slots: {country}, {city}'
color = "comment" # Grey
return emoji, headline, color, date_str
class WechatService:
@staticmethod
async def _send_webhook(api_token: str, payload: Dict[str, Any]):
"""内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=payload)
except httpx.RequestError as e:
raise BizLogicError(f"Wechat push request error: {e}")
if response.status_code != 200:
raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
data = response.json()
if data.get("errcode") != 0:
raise BizLogicError(
f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
)
return True
@staticmethod
async def push_markdown(api_token: str, content: str):
"""发送 Markdown 消息"""
body = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
return await WechatService._send_webhook(api_token, body)
@staticmethod
async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]):
# 获取元数据
emoji, headline, color, date_str = _get_display_meta(slot_snapshot)
# 解析详情
slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
# 处理可能的 None 值
website = slot_snapshot.get("website") or "#" # 如果没有网址,给一个空锚点
# 格式化更新时间
updated_at = slot_snapshot.get("snapshot_at", "")
if "T" in str(updated_at):
updated_at = str(updated_at).replace("T", " ")
TEMPLATE = (
"# {emoji} {headline}\n"
"> **Visa Type**: {visa_type}\n"
"> **Earliest**: {date_str}\n"
"> **Details**: {slots_summary}\n"
"\n"
"👉 [Tap to Book Appointment]({website})\n"
"\n"
"Updated: {updated_at}"
)
markdown_content = TEMPLATE.format_map({
"emoji": emoji,
"headline": headline,
"visa_type": slot_snapshot.get("visa_type", "N/A"),
"color": color,
"date_str": date_str,
"slots_summary": slots_summary,
"website": website,
"updated_at": updated_at,
})
return await WechatService.push_markdown(api_token, markdown_content)