telegram_service.py 726 B

1234567891011121314151617181920212223242526
  1. # app/services/telegram_service.py
  2. import httpx
  3. from app.core.biz_exception import BizLogicError
  4. from app.schemas.telegram import TelegramIn
  5. class TelegramService:
  6. @staticmethod
  7. async def push_to_telegram(payload: TelegramIn):
  8. url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
  9. body = {
  10. "chat_id": payload.chat_id,
  11. "text": payload.message,
  12. "parse_mode": "HTML",
  13. }
  14. async with httpx.AsyncClient(timeout=10) as client:
  15. resp = await client.post(url, json=body)
  16. if resp.status_code != 200:
  17. raise BizLogicError(
  18. f"Telegram push failed: {resp.status_code}, {resp.text}"
  19. )