# app/services/telegram_service.py import aiohttp from app.core.biz_exception import BizLogicError from app.schemas.telegram import TelegramIn class TelegramService: @staticmethod async def push_text(payload: TelegramIn): url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage" body = { "chat_id": payload.chat_id, "text": payload.message, "parse_mode": "HTML", } timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as client: async with client.post(url, json=body) as resp: if resp.status != 200: text = await resp.text() raise BizLogicError( f"Telegram push failed: {resp.status}, {text}" )