| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- # app/services/sms_service.py
- import json
- from typing import List
- from redis.asyncio import Redis
- from app.schemas.sms import ShortMessageDetail
- async def save_short_message(
- redis_client: Redis,
- phone: str,
- message: str,
- received_at: str,
- max_ttl: int
- ) -> ShortMessageDetail:
- """
- 将短信保存到 Redis(异步版)
- key: sms:{phone}
- value: JSON 数组(最多保留最近 20 条)
- """
- key = f"sms:{phone}"
- # 1️⃣ 读取已有短信
- existing_data = await redis_client.get(key)
- if existing_data:
- messages = json.loads(existing_data)
- else:
- messages = []
- # 2️⃣ 添加新短信
- new_msg = ShortMessageDetail(
- phone=phone,
- message=message,
- received_at=received_at
- )
- messages.append(new_msg.dict())
- # 3️⃣ 保留最近 20 条
- messages = messages[-20:]
- # 4️⃣ 写回 Redis(重置 TTL)
- await redis_client.setex(
- key,
- max_ttl,
- json.dumps(messages)
- )
- return new_msg
- async def query_short_message(
- redis_client: Redis,
- phone: str,
- keyword: str = None,
- sent_at: str = None
- ) -> List[ShortMessageDetail]:
- """
- 从 Redis 查询短信(异步版)
- 支持关键字 / 时间过滤
- """
- key = f"sms:{phone}"
- existing_data = await redis_client.get(key)
- if not existing_data:
- return []
- messages = [
- ShortMessageDetail(**m)
- for m in json.loads(existing_data)
- ]
- # 关键字过滤
- if keyword:
- messages = [
- m for m in messages
- if keyword in m.message
- ]
- # 时间过滤(字符串比较,ISO8601 安全)
- if sent_at:
- messages = [
- m for m in messages
- if m.received_at >= sent_at
- ]
- return messages
|