| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- # app/services/telegram_service.py
- import aiohttp
- from app.core.biz_exception import BizLogicError
- from app.core.config import settings
- from app.schemas.telegram import TelegramIn, TelegramNoTokenIn
- class TelegramService:
- @staticmethod
- async def push_text(payload: TelegramIn):
- if not payload.api_token:
- raise BizLogicError("Telegram api_token missing")
- url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
- body = {
- "chat_id": payload.chat_id,
- "text": payload.message,
- "parse_mode": "HTML",
- }
- timeout = aiohttp.ClientTimeout(total=10)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- async with client.post(url, json=body) as resp:
- if resp.status != 200:
- text = await resp.text()
- raise BizLogicError(
- f"Telegram push failed: {resp.status}, {text}"
- )
- @staticmethod
- async def push_text_no_token(payload: TelegramNoTokenIn):
- api_token = settings.telegram_api_token
- if not api_token:
- raise BizLogicError("Telegram api_token missing in env")
- url = f"https://api.telegram.org/bot{api_token}/sendMessage"
- body = {
- "chat_id": payload.chat_id,
- "text": payload.message,
- "parse_mode": "HTML",
- }
- timeout = aiohttp.ClientTimeout(total=10)
- async with aiohttp.ClientSession(timeout=timeout) as client:
- async with client.post(url, json=body) as resp:
- if resp.status != 200:
- text = await resp.text()
- raise BizLogicError(
- f"Telegram push failed: {resp.status}, {text}"
- )
|