telegram_service.py 847 B

1234567891011121314151617181920212223242526
  1. # app/services/telegram_service.py
  2. import aiohttp
  3. from app.core.biz_exception import BizLogicError
  4. from app.schemas.telegram import TelegramIn
  5. class TelegramService:
  6. @staticmethod
  7. async def push_text(payload: TelegramIn):
  8. url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
  9. body = {
  10. "chat_id": payload.chat_id,
  11. "text": payload.message,
  12. "parse_mode": "HTML",
  13. }
  14. timeout = aiohttp.ClientTimeout(total=10)
  15. async with aiohttp.ClientSession(timeout=timeout) as client:
  16. async with client.post(url, json=body) as resp:
  17. if resp.status != 200:
  18. text = await resp.text()
  19. raise BizLogicError(
  20. f"Telegram push failed: {resp.status}, {text}"
  21. )