sms_service.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # app/services/sms_service.py
  2. import json
  3. import os
  4. from typing import List
  5. from urllib.parse import quote
  6. import aiohttp
  7. from redis.asyncio import Redis
  8. from app.schemas.sms import ShortMessageDetail
  9. from app.core.biz_exception import BizLogicError
  10. async def save_short_message(
  11. redis_client: Redis,
  12. phone: str,
  13. message: str,
  14. received_at: str,
  15. max_ttl: int
  16. ) -> ShortMessageDetail:
  17. """
  18. 将短信保存到 Redis(异步版)
  19. key: sms:{phone}
  20. value: JSON 数组(最多保留最近 20 条)
  21. """
  22. key = f"sms:{phone}"
  23. # 1️⃣ 读取已有短信
  24. existing_data = await redis_client.get(key)
  25. if existing_data:
  26. messages = json.loads(existing_data)
  27. else:
  28. messages = []
  29. # 2️⃣ 添加新短信
  30. new_msg = ShortMessageDetail(
  31. phone=phone,
  32. message=message,
  33. received_at=received_at
  34. )
  35. messages.append(new_msg.dict())
  36. # 3️⃣ 保留最近 20 条
  37. messages = messages[-20:]
  38. # 4️⃣ 写回 Redis(重置 TTL)
  39. await redis_client.setex(
  40. key,
  41. max_ttl,
  42. json.dumps(messages)
  43. )
  44. return new_msg
  45. async def query_short_message(
  46. redis_client: Redis,
  47. phone: str,
  48. keyword: str = None,
  49. sent_at: str = None
  50. ) -> List[ShortMessageDetail]:
  51. """
  52. 从 Redis 查询短信(异步版)
  53. 支持关键字 / 时间过滤
  54. """
  55. key = f"sms:{phone}"
  56. existing_data = await redis_client.get(key)
  57. if not existing_data:
  58. return []
  59. messages = [
  60. ShortMessageDetail(**m)
  61. for m in json.loads(existing_data)
  62. ]
  63. # 关键字过滤
  64. if keyword:
  65. messages = [
  66. m for m in messages
  67. if keyword in m.message
  68. ]
  69. # 时间过滤(字符串比较,ISO8601 安全)
  70. if sent_at:
  71. messages = [
  72. m for m in messages
  73. if m.received_at >= sent_at
  74. ]
  75. return messages
  76. async def send_sms(
  77. send_to: str,
  78. sender: str,
  79. content: str,
  80. cpid: str = "",
  81. cppwd: str = "",
  82. base_url: str = "",
  83. ) -> str:
  84. """
  85. 发送短信(异步版)
  86. """
  87. cpid = cpid or os.getenv("SMS_CPID", "6jLoZoRZ")
  88. cppwd = cppwd or os.getenv("SMS_CPPWD", "LYTErsAE")
  89. base_url = base_url or os.getenv("SMS_BASE_URL", "http://api2.santo.cc/submit")
  90. if not send_to:
  91. raise BizLogicError("sms send_to required")
  92. if not sender:
  93. raise BizLogicError("sms sender required")
  94. if not content:
  95. raise BizLogicError("sms content required")
  96. encoded = quote(content, safe="")
  97. url = (
  98. f"{base_url}"
  99. f"?command=MT_REQUEST"
  100. f"&cpid={cpid}"
  101. f"&cppwd={cppwd}"
  102. f"&da={send_to}"
  103. f"&sa={sender}"
  104. f"&sm={encoded}"
  105. )
  106. timeout = aiohttp.ClientTimeout(total=10)
  107. try:
  108. async with aiohttp.ClientSession(timeout=timeout) as session:
  109. async with session.get(url) as resp:
  110. text = await resp.text()
  111. if resp.status >= 300:
  112. raise BizLogicError(f"sms send failed, http_status={resp.status}, body={text}")
  113. return text
  114. except aiohttp.ClientError as e:
  115. raise BizLogicError(f"sms send request error: {e}")