| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- # app/services/sms_service.py
- import json
- import os
- from typing import List
- from urllib.parse import quote
- import aiohttp
- from redis.asyncio import Redis
- from app.schemas.sms import ShortMessageDetail
- from app.core.biz_exception import BizLogicError
- async def save_short_message(
- redis_client: Redis,
- phone: str,
- message: str,
- received_at: str,
- max_ttl: int
- ) -> ShortMessageDetail:
- """
- 将短信保存到 Redis(异步版)
- key: sms:{phone}
- value: JSON 数组(最多保留最近 20 条)
- """
- key = f"sms:{phone}"
- # 1️⃣ 读取已有短信
- existing_data = await redis_client.get(key)
- if existing_data:
- messages = json.loads(existing_data)
- else:
- messages = []
- # 2️⃣ 添加新短信
- new_msg = ShortMessageDetail(
- phone=phone,
- message=message,
- received_at=received_at
- )
- messages.append(new_msg.dict())
- # 3️⃣ 保留最近 20 条
- messages = messages[-20:]
- # 4️⃣ 写回 Redis(重置 TTL)
- await redis_client.setex(
- key,
- max_ttl,
- json.dumps(messages)
- )
- return new_msg
- async def query_short_message(
- redis_client: Redis,
- phone: str,
- keyword: str = None,
- sent_at: str = None
- ) -> List[ShortMessageDetail]:
- """
- 从 Redis 查询短信(异步版)
- 支持关键字 / 时间过滤
- """
- key = f"sms:{phone}"
- existing_data = await redis_client.get(key)
- if not existing_data:
- return []
- messages = [
- ShortMessageDetail(**m)
- for m in json.loads(existing_data)
- ]
- # 关键字过滤
- if keyword:
- messages = [
- m for m in messages
- if keyword in m.message
- ]
- # 时间过滤(字符串比较,ISO8601 安全)
- if sent_at:
- messages = [
- m for m in messages
- if m.received_at >= sent_at
- ]
- return messages
- async def send_sms(
- send_to: str,
- sender: str,
- content: str,
- cpid: str = "",
- cppwd: str = "",
- base_url: str = "",
- ) -> str:
- """
- 发送短信(异步版)
- """
- cpid = cpid or os.getenv("SMS_CPID", "6jLoZoRZ")
- cppwd = cppwd or os.getenv("SMS_CPPWD", "LYTErsAE")
- base_url = base_url or os.getenv("SMS_BASE_URL", "http://api2.santo.cc/submit")
- if not send_to:
- raise BizLogicError("sms send_to required")
- if not sender:
- raise BizLogicError("sms sender required")
- if not content:
- raise BizLogicError("sms content required")
- encoded = quote(content, safe="")
- url = (
- f"{base_url}"
- f"?command=MT_REQUEST"
- f"&cpid={cpid}"
- f"&cppwd={cppwd}"
- f"&da={send_to}"
- f"&sa={sender}"
- f"&sm={encoded}"
- )
- timeout = aiohttp.ClientTimeout(total=10)
- try:
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.get(url) as resp:
- text = await resp.text()
- if resp.status >= 300:
- raise BizLogicError(f"sms send failed, http_status={resp.status}, body={text}")
- return text
- except aiohttp.ClientError as e:
- raise BizLogicError(f"sms send request error: {e}")
|