import json from typing import Optional from redis.asyncio import Redis async def redis_qpush( redis_client: Redis, qname: str, data: dict, max_len: int = 30, ): """ 向队列右侧推入数据,并限制队列最大长度(Async 版) """ data_string = json.dumps(data) pipe = redis_client.pipeline(transaction=True) pipe.rpush(qname, data_string) pipe.ltrim(qname, -max_len, -1) await pipe.execute() async def redis_qpop( redis_client: Redis, qname: str, timeout: int = 5, ) -> Optional[dict]: """ 从队列左侧阻塞弹出数据(Async 版) """ message = await redis_client.blpop(qname, timeout=timeout) if message is None: return None _, message_string = message return json.loads(message_string)