import aiohttp from typing import Optional from app.core.biz_exception import BizLogicError class WhatsappService: @staticmethod async def send_text( api_base_url: str, session: str, chat_id: str, message: str, api_key: Optional[str] = None, ): if not api_base_url: raise BizLogicError("Whatsapp api_base_url not configured") if not chat_id: raise BizLogicError("Whatsapp chat_id missing") headers = {} if api_key: headers["X-Api-Key"] = api_key read_url = f"{api_base_url}/api/{session}/chats/{chat_id}/messages/read" send_url = f"{api_base_url}/api/sendText" timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session_client: async with session_client.post(read_url, json={}, headers=headers) as read_resp: if read_resp.status >= 300: raise BizLogicError(f"Whatsapp read failed, http_status={read_resp.status}") payload = { "session": session, "chatId": chat_id, "text": message, } async with session_client.post(send_url, json=payload, headers=headers) as send_resp: if send_resp.status >= 300: raise BizLogicError(f"Whatsapp push failed, http_status={send_resp.status}") except aiohttp.ClientError as e: raise BizLogicError(f"Whatsapp push request error: {e}") return True