telegram_service.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # app/services/telegram_service.py
  2. import httpx
  3. from app.core.biz_exception import BizLogicError
  4. from app.schemas.telegram import TelegramIn
  5. def _parse_slots_summary(availability_data):
  6. """
  7. availability:
  8. [
  9. { "date": "2025-01-05", "times": [] },
  10. { "date": "2025-01-06", "times": [{"time": "09:30"}] }
  11. ]
  12. """
  13. if not availability_data:
  14. return "No specific slots data."
  15. if not isinstance(availability_data, list):
  16. return str(availability_data)
  17. summaries = []
  18. # 限制显示天数,防止消息过长
  19. display_limit = 4
  20. for day in availability_data[:display_limit]:
  21. date_str = day.get("date")
  22. times = day.get("times", [])
  23. if not times:
  24. # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
  25. summaries.append(f"{date_str}")
  26. else:
  27. # 有具体时间的处理
  28. time_list = [t.get("time", "") for t in times[:5]]
  29. time_text = ", ".join(filter(None, time_list)) # 过滤空时间
  30. if len(times) > 5:
  31. time_text += f" (+{len(times) - 5})"
  32. summaries.append(f"{date_str}: {time_text}")
  33. if len(availability_data) > display_limit:
  34. summaries.append(f"(+{len(availability_data) - display_limit} more days)")
  35. return " | ".join(summaries)
  36. def _get_display_meta(slot_snapshot):
  37. """
  38. Return dynamic header logic based on status.
  39. Goal: Put the most important info in the notification preview.
  40. """
  41. date_str = slot_snapshot.get("earliest_date")
  42. if slot_snapshot.get("availability_status") == 'Available':
  43. # Notification Preview: "🟢 UK London: 15 Jan 2024"
  44. emoji = "🟢"
  45. # If available, the DATE is the headline
  46. headline = f'{slot_snapshot.get("country")}, {slot_snapshot.get("city")}: {date_str}'
  47. color = "info" # Green for WeChat
  48. elif slot_snapshot.get("availability_status") == 'Waitlist':
  49. emoji = "🟡"
  50. headline = f'Waitlist: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
  51. color = "warning" # Orange for WeChat
  52. else:
  53. emoji = "🔴"
  54. headline = f'No Slots: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
  55. color = "comment" # Grey for WeChat
  56. return emoji, headline, color, date_str
  57. class TelegramService:
  58. @staticmethod
  59. async def push_to_telegram(payload: TelegramIn):
  60. url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
  61. body = {
  62. "chat_id": payload.chat_id,
  63. "text": payload.message,
  64. "parse_mode": "HTML",
  65. }
  66. async with httpx.AsyncClient(timeout=10) as client:
  67. resp = await client.post(url, json=body)
  68. if resp.status_code != 200:
  69. raise BizLogicError(
  70. f"Telegram push failed: {resp.status_code}, {resp.text}"
  71. )
  72. async def push_slot_snapshot(api_token: str, chat_id: str, slot_snapshot) -> str:
  73. emoji, headline, _, date_str = _get_display_meta(slot_snapshot)
  74. # 解析详情
  75. slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
  76. # 处理可能的 None 值
  77. website = slot_snapshot.get("website") or "#" # 如果没有网址,给一个空锚点
  78. # 格式化更新时间
  79. checked_at = slot_snapshot.get("snapshot_at", "")
  80. if "T" in str(updated_at):
  81. checked_at = str(checked_at).replace("T", " ")
  82. TEMPLATE = (
  83. "{emoji} <b>{headline}</b>\n"
  84. "──────────────────\n"
  85. "🛂 <b>Visa:</b> {visa_type}\n"
  86. "📅 <b>Earliest:</b> <code>{date_str}</code>\n"
  87. "📊 <b>Slots:</b> {slots_summary}\n"
  88. "──────────────────\n"
  89. "🔗 <a href='{website}'><b>Book Now ➜</b></a>\n\n"
  90. "🕒 <i>Checked at {checked_at}</i>"
  91. )
  92. content = TEMPLATE.format_map({
  93. "emoji": emoji,
  94. "headline": headline,
  95. "visa_type": slot_snapshot.get("visa_type", "N/A"),
  96. "date_str": date_str,
  97. "slots_summary": slots_summary,
  98. "website": website,
  99. "checked_at": checked_at,
  100. })
  101. url = f"https://api.telegram.org/bot{api_token}/sendMessage"
  102. body = {
  103. "chat_id": chat_id,
  104. "text": content,
  105. "parse_mode": "HTML",
  106. }
  107. async with httpx.AsyncClient(timeout=10) as client:
  108. resp = await client.post(url, json=body)
  109. if resp.status_code != 200:
  110. raise BizLogicError(
  111. f"Telegram push failed: {resp.status_code}, {resp.text}"
  112. )