import httpx from typing import Dict, Any, List, Union from datetime import datetime from app.core.biz_exception import BizLogicError from app.schemas.wechat import WechatIn def _parse_slots_summary(availability_data): """ availability: [ { "date": "2025-01-05", "times": [] }, { "date": "2025-01-06", "times": [{"time": "09:30"}] } ] """ if not availability_data: return "No specific slots data." if not isinstance(availability_data, list): return str(availability_data) summaries = [] # 限制显示天数,防止消息过长 display_limit = 4 for day in availability_data[:display_limit]: date_str = day.get("date") times = day.get("times", []) if not times: # 修改点:即使没有具体时间,只要有日期条目,也显示为可用 summaries.append(f"{date_str}") else: # 有具体时间的处理 time_list = [t.get("time", "") for t in times[:5]] time_text = ", ".join(filter(None, time_list)) # 过滤空时间 if len(times) > 5: time_text += f" (+{len(times) - 5})" summaries.append(f"{date_str}: {time_text}") if len(availability_data) > display_limit: summaries.append(f"(+{len(availability_data) - display_limit} more days)") return " | ".join(summaries) def _get_display_meta(slot_snapshot): """ Return dynamic header logic based on status. """ # 修复:确保这里也调用增强后的日期格式化 date_str = slot_snapshot.get("earliest_date") status = slot_snapshot.get("availability_status") country = slot_snapshot.get("country", "Unknown") city = slot_snapshot.get("city", "Unknown") if status == 'Available': emoji = "🟢" headline = f'{country}, {city}: {date_str}' color = "info" # Green elif status == 'Waitlist': emoji = "🟡" headline = f'Waitlist: {country}, {city}' color = "warning" # Orange else: emoji = "🔴" headline = f'No Slots: {country}, {city}' color = "comment" # Grey return emoji, headline, color, date_str class WechatService: @staticmethod async def _send_webhook(api_token: str, payload: Dict[str, Any]): """内部私有方法:发送 HTTP 请求到企业微信 Webhook""" url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}" try: async with httpx.AsyncClient(timeout=10) as client: response = await client.post(url, json=payload) except httpx.RequestError as e: raise BizLogicError(f"Wechat push request error: {e}") if response.status_code != 200: raise BizLogicError(f"Wechat push failed, http_status={response.status_code}") data = response.json() if data.get("errcode") != 0: raise BizLogicError( f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}" ) return True @staticmethod async def push_markdown(api_token: str, content: str): """发送 Markdown 消息""" body = { "msgtype": "markdown", "markdown": { "content": content } } return await WechatService._send_webhook(api_token, body) @staticmethod async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]): # 获取元数据 emoji, headline, color, date_str = _get_display_meta(slot_snapshot) # 解析详情 slots_summary = _parse_slots_summary(slot_snapshot.get("availability")) # 处理可能的 None 值 website = slot_snapshot.get("website") or "#" # 如果没有网址,给一个空锚点 # 格式化更新时间 updated_at = slot_snapshot.get("snapshot_at", "") if "T" in str(updated_at): updated_at = str(updated_at).replace("T", " ") TEMPLATE = ( "# {emoji} {headline}\n" "\n" "👉 [Book now]({website})\n" ) markdown_content = TEMPLATE.format_map({ "emoji": emoji, "headline": headline, "visa_type": slot_snapshot.get("visa_type", "N/A"), "color": color, "date_str": date_str, "slots_summary": slots_summary, "website": website, "updated_at": updated_at, }) return await WechatService.push_markdown(api_token, markdown_content)