Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jul 10th, 2012  |  syntax: None  |  size: 2.68 KB  |  hits: 13  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py
  2. index 0ed90e0..80f96ec 100644
  3. --- a/kombu/transport/pyredis.py
  4. +++ b/kombu/transport/pyredis.py
  5. @@ -227,41 +227,41 @@ class Channel(virtual.Channel):
  6.              Implies ``no_ack=True``
  7.  
  8.          """
  9. -        item = self.client.rpop(queue)
  10. +        item = self._avail_client.rpop(queue)
  11.          if item:
  12.              return deserialize(item)
  13.          raise Empty()
  14.  
  15.      def _size(self, queue):
  16. -        return self.client.llen(queue)
  17. +        return self._avail_client.llen(queue)
  18.  
  19.      def _put(self, queue, message, **kwargs):
  20.          """Deliver message."""
  21. -        self.client.lpush(queue, serialize(message))
  22. +        self._avail_client.lpush(queue, serialize(message))
  23.  
  24.      def _put_fanout(self, exchange, message, **kwargs):
  25.          """Deliver fanout message."""
  26. -        self.client.publish(exchange, serialize(message))
  27. +        self._avail_client.publish(exchange, serialize(message))
  28.  
  29.      def _queue_bind(self, exchange, routing_key, pattern, queue):
  30.          if self.typeof(exchange).type == "fanout":
  31.              # Mark exchange as fanout.
  32.              self._fanout_queues[queue] = exchange
  33. -        self.client.sadd(self.keyprefix_queue % (exchange, ),
  34. -                          self.sep.join([routing_key or "",
  35. -                                        pattern or "",
  36. -                                        queue or ""]))
  37. +        self._avail_client.sadd(self.keyprefix_queue % (exchange, ),
  38. +                            self.sep.join([routing_key or "",
  39. +                                           pattern or "",
  40. +                                           queue or ""]))
  41.  
  42.      def _has_queue(self, queue, **kwargs):
  43. -        return self.client.exists(queue)
  44. +        return self._avail_client.exists(queue)
  45.  
  46.      def get_table(self, exchange):
  47.          return [tuple(val.split(self.sep))
  48. -                    for val in self.client.smembers(
  49. +                    for val in self._avail_client.smembers(
  50.                              self.keyprefix_queue % exchange)]
  51.  
  52.      def _purge(self, queue):
  53. -        size, _ = self.client.pipeline().llen(queue) \
  54. +        size, _ = self._avail_client.pipeline().llen(queue) \
  55.                                          .delete(queue).execute()
  56.          return size
  57.  
  58. @@ -305,6 +305,12 @@ class Channel(virtual.Channel):
  59.          from redis import exceptions
  60.          return exceptions.ResponseError
  61.  
  62. +    @property
  63. +    def _avail_client(self):
  64. +        if self._in_poll:
  65. +            return self._create_client()
  66. +        return self.client
  67. +
  68.      @cached_property
  69.      def client(self):
  70.          return self._create_client()