# app/services/sms_service.py import json import os from typing import List from urllib.parse import quote import aiohttp from redis.asyncio import Redis from app.schemas.sms import ShortMessageDetail from app.core.biz_exception import BizLogicError async def save_short_message( redis_client: Redis, phone: str, message: str, received_at: str, max_ttl: int ) -> ShortMessageDetail: """ 将短信保存到 Redis(异步版) key: sms:{phone} value: JSON 数组(最多保留最近 20 条) """ key = f"sms:{phone}" # 1️⃣ 读取已有短信 existing_data = await redis_client.get(key) if existing_data: messages = json.loads(existing_data) else: messages = [] # 2️⃣ 添加新短信 new_msg = ShortMessageDetail( phone=phone, message=message, received_at=received_at ) messages.append(new_msg.dict()) # 3️⃣ 保留最近 20 条 messages = messages[-20:] # 4️⃣ 写回 Redis(重置 TTL) await redis_client.setex( key, max_ttl, json.dumps(messages) ) return new_msg async def query_short_message( redis_client: Redis, phone: str, keyword: str = None, sent_at: str = None ) -> List[ShortMessageDetail]: """ 从 Redis 查询短信(异步版) 支持关键字 / 时间过滤 """ key = f"sms:{phone}" existing_data = await redis_client.get(key) if not existing_data: return [] messages = [ ShortMessageDetail(**m) for m in json.loads(existing_data) ] # 关键字过滤 if keyword: messages = [ m for m in messages if keyword in m.message ] # 时间过滤(字符串比较,ISO8601 安全) if sent_at: messages = [ m for m in messages if m.received_at >= sent_at ] return messages async def send_sms( send_to: str, sender: str, content: str, cpid: str = "", cppwd: str = "", base_url: str = "", ) -> str: """ 发送短信(异步版) """ cpid = cpid or os.getenv("SMS_CPID", "6jLoZoRZ") cppwd = cppwd or os.getenv("SMS_CPPWD", "LYTErsAE") base_url = base_url or os.getenv("SMS_BASE_URL", "http://api2.santo.cc/submit") if not send_to: raise BizLogicError("sms send_to required") if not sender: raise BizLogicError("sms sender required") if not content: raise BizLogicError("sms content required") encoded = quote(content, safe="") url = ( f"{base_url}" f"?command=MT_REQUEST" f"&cpid={cpid}" f"&cppwd={cppwd}" f"&da={send_to}" f"&sa={sender}" f"&sm={encoded}" ) timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: text = await resp.text() if resp.status >= 300: raise BizLogicError(f"sms send failed, http_status={resp.status}, body={text}") return text except aiohttp.ClientError as e: raise BizLogicError(f"sms send request error: {e}")