| 1234567891011121314151617181920212223242526 |
- # app/services/telegram_service.py
- import httpx
- from app.core.biz_exception import BizLogicError
- from app.schemas.telegram import TelegramIn
- class TelegramService:
- @staticmethod
- async def push_to_telegram(payload: TelegramIn):
- url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
- body = {
- "chat_id": payload.chat_id,
- "text": payload.message,
- "parse_mode": "HTML",
- }
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.post(url, json=body)
- if resp.status_code != 200:
- raise BizLogicError(
- f"Telegram push failed: {resp.status_code}, {resp.text}"
- )
|