wechat_service.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import httpx
  2. from typing import Dict, Any, List, Union
  3. from datetime import datetime
  4. from app.core.biz_exception import BizLogicError
  5. from app.schemas.wechat import WechatIn
  6. def _parse_slots_summary(availability_data):
  7. """
  8. availability:
  9. [
  10. { "date": "2025-01-05", "times": [] },
  11. { "date": "2025-01-06", "times": [{"time": "09:30"}] }
  12. ]
  13. """
  14. if not availability_data:
  15. return "No specific slots data."
  16. if not isinstance(availability_data, list):
  17. return str(availability_data)
  18. summaries = []
  19. # 限制显示天数,防止消息过长
  20. display_limit = 4
  21. for day in availability_data[:display_limit]:
  22. date_str = day.get("date")
  23. times = day.get("times", [])
  24. if not times:
  25. # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
  26. summaries.append(f"{date_str}")
  27. else:
  28. # 有具体时间的处理
  29. time_list = [t.get("time", "") for t in times[:5]]
  30. time_text = ", ".join(filter(None, time_list)) # 过滤空时间
  31. if len(times) > 5:
  32. time_text += f" (+{len(times) - 5})"
  33. summaries.append(f"{date_str}: {time_text}")
  34. if len(availability_data) > display_limit:
  35. summaries.append(f"(+{len(availability_data) - display_limit} more days)")
  36. return " | ".join(summaries)
  37. def _get_display_meta(slot_snapshot):
  38. """
  39. Return dynamic header logic based on status.
  40. """
  41. # 修复:确保这里也调用增强后的日期格式化
  42. date_str = slot_snapshot.get("earliest_date")
  43. status = slot_snapshot.get("availability_status")
  44. country = slot_snapshot.get("country", "Unknown")
  45. city = slot_snapshot.get("city", "Unknown")
  46. if status == 'Available':
  47. emoji = "🟢"
  48. headline = f'{country}, {city}: {date_str}'
  49. color = "info" # Green
  50. elif status == 'Waitlist':
  51. emoji = "🟡"
  52. headline = f'Waitlist: {country}, {city}'
  53. color = "warning" # Orange
  54. else:
  55. emoji = "🔴"
  56. headline = f'No Slots: {country}, {city}'
  57. color = "comment" # Grey
  58. return emoji, headline, color, date_str
  59. class WechatService:
  60. @staticmethod
  61. async def _send_webhook(api_token: str, payload: Dict[str, Any]):
  62. """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
  63. url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
  64. try:
  65. async with httpx.AsyncClient(timeout=10) as client:
  66. response = await client.post(url, json=payload)
  67. except httpx.RequestError as e:
  68. raise BizLogicError(f"Wechat push request error: {e}")
  69. if response.status_code != 200:
  70. raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
  71. data = response.json()
  72. if data.get("errcode") != 0:
  73. raise BizLogicError(
  74. f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
  75. )
  76. return True
  77. @staticmethod
  78. async def push_markdown(api_token: str, content: str):
  79. """发送 Markdown 消息"""
  80. body = {
  81. "msgtype": "markdown",
  82. "markdown": {
  83. "content": content
  84. }
  85. }
  86. return await WechatService._send_webhook(api_token, body)
  87. @staticmethod
  88. async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]):
  89. # 获取元数据
  90. emoji, headline, color, date_str = _get_display_meta(slot_snapshot)
  91. # 解析详情
  92. slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
  93. # 处理可能的 None 值
  94. website = slot_snapshot.get("website") or "#" # 如果没有网址,给一个空锚点
  95. # 格式化更新时间
  96. updated_at = slot_snapshot.get("snapshot_at", "")
  97. if "T" in str(updated_at):
  98. updated_at = str(updated_at).replace("T", " ")
  99. TEMPLATE = (
  100. "# {emoji} {headline}\n"
  101. "\n"
  102. "👉 [Book now]({website})\n"
  103. )
  104. markdown_content = TEMPLATE.format_map({
  105. "emoji": emoji,
  106. "headline": headline,
  107. "visa_type": slot_snapshot.get("visa_type", "N/A"),
  108. "color": color,
  109. "date_str": date_str,
  110. "slots_summary": slots_summary,
  111. "website": website,
  112. "updated_at": updated_at,
  113. })
  114. return await WechatService.push_markdown(api_token, markdown_content)