From 8c6d04e8d320a1be051556d469fee806240cc25f Mon Sep 17 00:00:00 2001 From: Tom Hacohen Date: Sun, 4 Sep 2022 13:51:17 -0400 Subject: [PATCH] Replace aioredis with redis-py aioredis has been merged into redis-py and will no longer be maintained as a separate project. --- etebase_server/fastapi/redis.py | 7 +++--- etebase_server/fastapi/routers/websocket.py | 26 ++++++++++++--------- requirements.in/base.txt | 2 +- requirements.txt | 15 ++++++++---- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/etebase_server/fastapi/redis.py b/etebase_server/fastapi/redis.py index c4d697c..b4f7a04 100644 --- a/etebase_server/fastapi/redis.py +++ b/etebase_server/fastapi/redis.py @@ -1,5 +1,5 @@ import typing as t -import aioredis +from redis import asyncio as aioredis from etebase_server.django import app_settings @@ -12,12 +12,11 @@ class RedisWrapper: async def setup(self): if self.redis_uri is not None: - self.redis = await aioredis.create_redis_pool(self.redis_uri) + self.redis = await aioredis.from_url(self.redis_uri) async def close(self): if hasattr(self, "redis"): - self.redis.close() - await self.redis.wait_closed() + await self.redis.close() @property def is_active(self): diff --git a/etebase_server/fastapi/routers/websocket.py b/etebase_server/fastapi/routers/websocket.py index 019f58f..443ee06 100644 --- a/etebase_server/fastapi/routers/websocket.py +++ b/etebase_server/fastapi/routers/websocket.py @@ -1,7 +1,8 @@ import asyncio import typing as t -import aioredis +from redis import asyncio as aioredis +from redis.exceptions import ConnectionError from asgiref.sync import sync_to_async from django.db.models import QuerySet from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect, status @@ -51,7 +52,7 @@ async def get_ticket( uid = nacl.encoding.URLSafeBase64Encoder.encode(nacl.utils.random(32)) ticket_model = TicketInner(user=user.id, req=ticket_request) ticket_raw = msgpack_encode(ticket_model.dict()) - await redisw.redis.set(uid, ticket_raw, expire=TICKET_VALIDITY_SECONDS * 1000) + await redisw.redis.set(uid, ticket_raw, ex=TICKET_VALIDITY_SECONDS * 1000) return TicketOut(ticket=uid) @@ -103,9 +104,9 @@ async def send_item_updates( async def redis_connector(websocket: WebSocket, ticket_model: TicketInner, user: UserType, stoken: t.Optional[str]): async def producer_handler(r: aioredis.Redis, ws: WebSocket): + pubsub = r.pubsub() channel_name = f"col.{ticket_model.req.collection}" - (channel,) = await r.psubscribe(channel_name) - assert isinstance(channel, aioredis.Channel) + await pubsub.subscribe(channel_name) # Send missing items if we are not up to date queryset: QuerySet[models.Collection] = get_collection_queryset(user) @@ -117,12 +118,20 @@ async def redis_connector(websocket: WebSocket, ticket_model: TicketInner, user: return await send_item_updates(websocket, collection, user, stoken) + async def handle_message(): + msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=20) + message_raw = t.cast(t.Optional[t.Tuple[str, bytes]], msg) + if message_raw: + _, message = message_raw + await ws.send_bytes(message) + try: while True: # We wait on the websocket so we fail if web sockets fail or get data receive = asyncio.create_task(websocket.receive()) done, pending = await asyncio.wait( - {receive, channel.wait_message()}, return_when=asyncio.FIRST_COMPLETED + {receive, handle_message()}, + return_when=asyncio.FIRST_COMPLETED, ) for task in pending: task.cancel() @@ -131,12 +140,7 @@ async def redis_connector(websocket: WebSocket, ticket_model: TicketInner, user: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return - message_raw = t.cast(t.Optional[t.Tuple[str, bytes]], await channel.get()) - if message_raw: - _, message = message_raw - await ws.send_bytes(message) - - except aioredis.errors.ConnectionClosedError: + except ConnectionError: await websocket.close(code=status.WS_1012_SERVICE_RESTART) except WebSocketDisconnect: pass diff --git a/requirements.in/base.txt b/requirements.in/base.txt index e253c5e..6410748 100644 --- a/requirements.in/base.txt +++ b/requirements.in/base.txt @@ -5,4 +5,4 @@ fastapi typing_extensions uvicorn[standard] aiofiles -aioredis +redis>=4.2.0rc1 diff --git a/requirements.txt b/requirements.txt index 71085cf..f81eb35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,8 +6,6 @@ # aiofiles==0.8.0 # via -r requirements.in/base.txt -aioredis==2.0.1 - # via -r requirements.in/base.txt anyio==3.5.0 # via # starlette @@ -17,11 +15,13 @@ asgiref==3.5.0 # django # uvicorn async-timeout==4.0.2 - # via aioredis + # via redis cffi==1.15.0 # via pynacl click==8.0.4 # via uvicorn +deprecated==1.2.13 + # via redis django==3.2.13 # via -r requirements.in/base.txt fastapi==0.75.0 @@ -34,18 +34,24 @@ idna==3.3 # via anyio msgpack==1.0.3 # via -r requirements.in/base.txt +packaging==21.3 + # via redis pycparser==2.21 # via cffi pydantic==1.9.0 # via fastapi pynacl==1.5.0 # via -r requirements.in/base.txt +pyparsing==3.0.9 + # via packaging python-dotenv==0.19.2 # via uvicorn pytz==2022.1 # via django pyyaml==6.0 # via uvicorn +redis==4.3.4 + # via -r requirements.in/base.txt sniffio==1.2.0 # via anyio sqlparse==0.4.2 @@ -55,7 +61,6 @@ starlette==0.17.1 typing-extensions==4.1.1 # via # -r requirements.in/base.txt - # aioredis # pydantic uvicorn[standard]==0.17.6 # via -r requirements.in/base.txt @@ -65,3 +70,5 @@ watchgod==0.8.1 # via uvicorn websockets==10.2 # via uvicorn +wrapt==1.14.1 + # via deprecated