- diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py
- index 0ed90e0..80f96ec 100644
- --- a/kombu/transport/pyredis.py
- +++ b/kombu/transport/pyredis.py
- @@ -227,41 +227,41 @@ class Channel(virtual.Channel):
- Implies ``no_ack=True``
- """
- - item = self.client.rpop(queue)
- + item = self._avail_client.rpop(queue)
- if item:
- return deserialize(item)
- raise Empty()
- def _size(self, queue):
- - return self.client.llen(queue)
- + return self._avail_client.llen(queue)
- def _put(self, queue, message, **kwargs):
- """Deliver message."""
- - self.client.lpush(queue, serialize(message))
- + self._avail_client.lpush(queue, serialize(message))
- def _put_fanout(self, exchange, message, **kwargs):
- """Deliver fanout message."""
- - self.client.publish(exchange, serialize(message))
- + self._avail_client.publish(exchange, serialize(message))
- def _queue_bind(self, exchange, routing_key, pattern, queue):
- if self.typeof(exchange).type == "fanout":
- # Mark exchange as fanout.
- self._fanout_queues[queue] = exchange
- - self.client.sadd(self.keyprefix_queue % (exchange, ),
- - self.sep.join([routing_key or "",
- - pattern or "",
- - queue or ""]))
- + self._avail_client.sadd(self.keyprefix_queue % (exchange, ),
- + self.sep.join([routing_key or "",
- + pattern or "",
- + queue or ""]))
- def _has_queue(self, queue, **kwargs):
- - return self.client.exists(queue)
- + return self._avail_client.exists(queue)
- def get_table(self, exchange):
- return [tuple(val.split(self.sep))
- - for val in self.client.smembers(
- + for val in self._avail_client.smembers(
- self.keyprefix_queue % exchange)]
- def _purge(self, queue):
- - size, _ = self.client.pipeline().llen(queue) \
- + size, _ = self._avail_client.pipeline().llen(queue) \
- .delete(queue).execute()
- return size
- @@ -305,6 +305,12 @@ class Channel(virtual.Channel):
- from redis import exceptions
- return exceptions.ResponseError
- + @property
- + def _avail_client(self):
- + if self._in_poll:
- + return self._create_client()
- + return self.client
- +
- @cached_property
- def client(self):
- return self._create_client()