# app/services/sms_service.py import json from typing import List from redis.asyncio import Redis from app.schemas.sms import ShortMessageDetail async def save_short_message( redis_client: Redis, phone: str, message: str, received_at: str, max_ttl: int ) -> ShortMessageDetail: """ 将短信保存到 Redis(异步版) key: sms:{phone} value: JSON 数组(最多保留最近 20 条) """ key = f"sms:{phone}" # 1️⃣ 读取已有短信 existing_data = await redis_client.get(key) if existing_data: messages = json.loads(existing_data) else: messages = [] # 2️⃣ 添加新短信 new_msg = ShortMessageDetail( phone=phone, message=message, received_at=received_at ) messages.append(new_msg.dict()) # 3️⃣ 保留最近 20 条 messages = messages[-20:] # 4️⃣ 写回 Redis(重置 TTL) await redis_client.setex( key, max_ttl, json.dumps(messages) ) return new_msg async def query_short_message( redis_client: Redis, phone: str, keyword: str = None, sent_at: str = None ) -> List[ShortMessageDetail]: """ 从 Redis 查询短信(异步版) 支持关键字 / 时间过滤 """ key = f"sms:{phone}" existing_data = await redis_client.get(key) if not existing_data: return [] messages = [ ShortMessageDetail(**m) for m in json.loads(existing_data) ] # 关键字过滤 if keyword: messages = [ m for m in messages if keyword in m.message ] # 时间过滤(字符串比较,ISO8601 安全) if sent_at: messages = [ m for m in messages if m.received_at >= sent_at ] return messages