redis_utils.py 811 B

123456789101112131415161718192021222324252627282930313233343536
  1. import json
  2. from typing import Optional
  3. from redis.asyncio import Redis
  4. async def redis_qpush(
  5. redis_client: Redis,
  6. qname: str,
  7. data: dict,
  8. max_len: int = 30,
  9. ):
  10. """
  11. 向队列右侧推入数据,并限制队列最大长度(Async 版)
  12. """
  13. data_string = json.dumps(data)
  14. pipe = redis_client.pipeline(transaction=True)
  15. pipe.rpush(qname, data_string)
  16. pipe.ltrim(qname, -max_len, -1)
  17. await pipe.execute()
  18. async def redis_qpop(
  19. redis_client: Redis,
  20. qname: str,
  21. timeout: int = 5,
  22. ) -> Optional[dict]:
  23. """
  24. 从队列左侧阻塞弹出数据(Async 版)
  25. """
  26. message = await redis_client.blpop(qname, timeout=timeout)
  27. if message is None:
  28. return None
  29. _, message_string = message
  30. return json.loads(message_string)