redis_utils.py 731 B

12345678910111213141516171819202122
  1. import json
  2. import pytz
  3. import resource
  4. from redis.asyncio import Redis
  5. from datetime import datetime, timedelta
  6. def redis_qpush(redis_client, qname: str, data: dict, max_len: int = 30):
  7. """向队列右侧推入数据,并限制队列最大长度"""
  8. data_string = json.dumps(data)
  9. pipe = redis_client.pipeline()
  10. pipe.rpush(qname, data_string)
  11. pipe.ltrim(qname, -max_len, -1) # 只保留右侧 max_len 个元素
  12. pipe.execute()
  13. def redis_qpop(redis_client, qname:str, timeout: int = 5):
  14. message = redis_client.blpop(qname, timeout=timeout)
  15. if message is None:
  16. return None # 队列为空,直接返回
  17. message_string = message[1]
  18. data = json.loads(message_string)
  19. return data