| 123456789101112131415161718192021222324252627282930313233343536 |
- import json
- from typing import Optional
- from redis.asyncio import Redis
- async def redis_qpush(
- redis_client: Redis,
- qname: str,
- data: dict,
- max_len: int = 30,
- ):
- """
- 向队列右侧推入数据,并限制队列最大长度(Async 版)
- """
- data_string = json.dumps(data)
- pipe = redis_client.pipeline(transaction=True)
- pipe.rpush(qname, data_string)
- pipe.ltrim(qname, -max_len, -1)
- await pipe.execute()
- async def redis_qpop(
- redis_client: Redis,
- qname: str,
- timeout: int = 5,
- ) -> Optional[dict]:
- """
- 从队列左侧阻塞弹出数据(Async 版)
- """
- message = await redis_client.blpop(qname, timeout=timeout)
- if message is None:
- return None
- _, message_string = message
- return json.loads(message_string)
|