telegram_service.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. # app/services/telegram_service.py
  2. import aiohttp
  3. from app.core.biz_exception import BizLogicError
  4. from app.core.config import settings
  5. from app.schemas.telegram import TelegramIn, TelegramNoTokenIn
  6. class TelegramService:
  7. @staticmethod
  8. async def push_text(payload: TelegramIn):
  9. if not payload.api_token:
  10. raise BizLogicError("Telegram api_token missing")
  11. url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
  12. body = {
  13. "chat_id": payload.chat_id,
  14. "text": payload.message,
  15. "parse_mode": "HTML",
  16. }
  17. timeout = aiohttp.ClientTimeout(total=10)
  18. async with aiohttp.ClientSession(timeout=timeout) as client:
  19. async with client.post(url, json=body) as resp:
  20. if resp.status != 200:
  21. text = await resp.text()
  22. raise BizLogicError(
  23. f"Telegram push failed: {resp.status}, {text}"
  24. )
  25. @staticmethod
  26. async def push_text_no_token(payload: TelegramNoTokenIn):
  27. api_token = settings.telegram_api_token
  28. if not api_token:
  29. raise BizLogicError("Telegram api_token missing in env")
  30. url = f"https://api.telegram.org/bot{api_token}/sendMessage"
  31. body = {
  32. "chat_id": payload.chat_id,
  33. "text": payload.message,
  34. "parse_mode": "HTML",
  35. }
  36. timeout = aiohttp.ClientTimeout(total=10)
  37. async with aiohttp.ClientSession(timeout=timeout) as client:
  38. async with client.post(url, json=body) as resp:
  39. if resp.status != 200:
  40. text = await resp.text()
  41. raise BizLogicError(
  42. f"Telegram push failed: {resp.status}, {text}"
  43. )