# app/services/telegram_service.py import aiohttp from app.core.biz_exception import BizLogicError from app.core.config import settings from app.schemas.telegram import TelegramIn, TelegramNoTokenIn class TelegramService: @staticmethod async def push_text(payload: TelegramIn): if not payload.api_token: raise BizLogicError("Telegram api_token missing") url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage" body = { "chat_id": payload.chat_id, "text": payload.message, "parse_mode": "HTML", } timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as client: async with client.post(url, json=body) as resp: if resp.status != 200: text = await resp.text() raise BizLogicError( f"Telegram push failed: {resp.status}, {text}" ) @staticmethod async def push_text_no_token(payload: TelegramNoTokenIn): api_token = settings.telegram_api_token if not api_token: raise BizLogicError("Telegram api_token missing in env") url = f"https://api.telegram.org/bot{api_token}/sendMessage" body = { "chat_id": payload.chat_id, "text": payload.message, "parse_mode": "HTML", } timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as client: async with client.post(url, json=body) as resp: if resp.status != 200: text = await resp.text() raise BizLogicError( f"Telegram push failed: {resp.status}, {text}" )