| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import httpx
- from typing import Dict, Any, List, Union
- from datetime import datetime
- from app.core.biz_exception import BizLogicError
- from app.schemas.wechat import WechatIn
- def _parse_slots_summary(availability_data):
- """
- availability:
- [
- { "date": "2025-01-05", "times": [] },
- { "date": "2025-01-06", "times": [{"time": "09:30"}] }
- ]
- """
- if not availability_data:
- return "No specific slots data."
-
- if not isinstance(availability_data, list):
- return str(availability_data)
-
- summaries = []
- # 限制显示天数,防止消息过长
- display_limit = 4
-
- for day in availability_data[:display_limit]:
- date_str = day.get("date")
-
- times = day.get("times", [])
-
- if not times:
- # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
- summaries.append(f"{date_str}")
- else:
- # 有具体时间的处理
- time_list = [t.get("time", "") for t in times[:5]]
- time_text = ", ".join(filter(None, time_list)) # 过滤空时间
- if len(times) > 5:
- time_text += f" (+{len(times) - 5})"
- summaries.append(f"{date_str}: {time_text}")
-
- if len(availability_data) > display_limit:
- summaries.append(f"(+{len(availability_data) - display_limit} more days)")
-
- return " | ".join(summaries)
- def _get_display_meta(slot_snapshot):
- """
- Return dynamic header logic based on status.
- """
- # 修复:确保这里也调用增强后的日期格式化
- date_str = slot_snapshot.get("earliest_date")
-
- status = slot_snapshot.get("availability_status")
- country = slot_snapshot.get("country", "Unknown")
- city = slot_snapshot.get("city", "Unknown")
- if status == 'Available':
- emoji = "🟢"
- headline = f'{country}, {city}: {date_str}'
- color = "info" # Green
- elif status == 'Waitlist':
- emoji = "🟡"
- headline = f'Waitlist: {country}, {city}'
- color = "warning" # Orange
- else:
- emoji = "🔴"
- headline = f'No Slots: {country}, {city}'
- color = "comment" # Grey
-
- return emoji, headline, color, date_str
- class WechatService:
-
- @staticmethod
- async def _send_webhook(api_token: str, payload: Dict[str, Any]):
- """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- response = await client.post(url, json=payload)
- except httpx.RequestError as e:
- raise BizLogicError(f"Wechat push request error: {e}")
- if response.status_code != 200:
- raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
- data = response.json()
- if data.get("errcode") != 0:
- raise BizLogicError(
- f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
- )
- return True
- @staticmethod
- async def push_markdown(api_token: str, content: str):
- """发送 Markdown 消息"""
- body = {
- "msgtype": "markdown",
- "markdown": {
- "content": content
- }
- }
- return await WechatService._send_webhook(api_token, body)
- @staticmethod
- async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]):
- # 获取元数据
- emoji, headline, color, date_str = _get_display_meta(slot_snapshot)
-
- # 解析详情
- slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
-
- # 处理可能的 None 值
- website = slot_snapshot.get("website") or "#" # 如果没有网址,给一个空锚点
- # 格式化更新时间
- updated_at = slot_snapshot.get("snapshot_at", "")
- if "T" in str(updated_at):
- updated_at = str(updated_at).replace("T", " ")
- TEMPLATE = (
- "# {emoji} {headline}\n"
- "\n"
- "👉 [Book now]({website})\n"
- )
- markdown_content = TEMPLATE.format_map({
- "emoji": emoji,
- "headline": headline,
- "visa_type": slot_snapshot.get("visa_type", "N/A"),
- "color": color,
- "date_str": date_str,
- "slots_summary": slots_summary,
- "website": website,
- "updated_at": updated_at,
- })
-
- return await WechatService.push_markdown(api_token, markdown_content)
|