| 1234567891011121314151617181920212223242526 |
- # app/services/telegram_service.py
- import aiohttp
- from app.core.biz_exception import BizLogicError
- from app.schemas.telegram import TelegramIn
- class TelegramService:
- @staticmethod
- async def push_text(payload: TelegramIn):
- url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
- body = {
- "chat_id": payload.chat_id,
- "text": payload.message,
- "parse_mode": "HTML",
- }
- timeout = aiohttp.ClientTimeout(total=10)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- async with client.post(url, json=body) as resp:
- if resp.status != 200:
- text = await resp.text()
- raise BizLogicError(
- f"Telegram push failed: {resp.status}, {text}"
- )
|