-   Notifications  You must be signed in to change notification settings 
- Fork 2.6k
Description
Version: 4.2.2
Platform: python 3.8 on windows
Description: Running a pub-sub task in a loop, requires me to manually call pubsub.connection.connect()
So:
I'm new to python redis, having previously used hiredis and written clients for UnrealEngine using it.
So, I was pleasantly surprised at how the client typically handles reconnections for you, in case there are disconnects.
 However, this does not happen with the PubSub client.
Please note that I am using the redis.asyncio module here.
There is a retry mechanism in PubSub._execute, which callse PubSub._disconnect_raise_connect in case of failure.
 However, that function only calls connect() in case of a TimeoutError.
If there is a disconnect, for example, because of the connection to the server disappearing for a few seconds, or other,
 the connection stays in a disconnected state.
In my tests, I killed and restarted the server and the PubSub.connection stayed in a is_connected==False state, with any read operations resulting in a ConnectionError("Connection closed by server.").
Previously the loop looked somethign like:
async def loop(self): await self.perform_any_subscriptions() while True: try: await self.loop_step() except redis.ConnectionError: pass async def loop_step(self): async for message in self.pubsub.listen(): await self.dispatch_message(message) breakIf the connection becomes disconnected, the listen() will simply continually raise ConnectionError("Connection closed by server.").
What I had to do, as a workaround, was to do something like this, for my listening loop:
async def loop(self): while True: await self.pubsub.connection.call_with_retry( self.loop_step, lambda e: if isinstance(e, redis.ConnectionError) and self.pubsub.connection and not self.pubsub.connection.is_connected: await self.pubsubconnection.connect() )(use the call_with_retry as a handy catch-all system)
I'm not sure if this is expected, or if it is an error, but from the documentation I'm reading, it seems like PubSub ought to be re-usable across disconnects, automatically reconnecting any previous subscriptions, etc.
UPDATE:
After PR #2148, the async PubSub class now has a connect() method, which can be used to remedy
 this. The top loop becomes:
async def loop(self): await self.perform_any_subscriptions() while True: try: await self.pubsub.connect() await self.loop_step() except redis.ConnectionError: passPreviously it was not possible to reconnect a PubSub object after a ConnectionError without issuing a "subscribe()" or "unsubscribe"