Advertisement
Guest User

Terry Jones

a guest
Dec 8th, 2009
382
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.17 KB | None | 0 0
  1. from twisted.internet import defer, task
  2.  
  3.  
  4. class DeferredPool(object):
  5.     def __init__(self, initialContents=None):
  6.         self._pool = set()
  7.         self._waiting = []
  8.         if initialContents:
  9.             for d in initialContents:
  10.                 self.add(d)
  11.  
  12.     def _fired(self, result, d):
  13.         self._pool.remove(d)
  14.         if not self._pool:
  15.             waiting, self._waiting = self._waiting, []
  16.             for waiter in waiting:
  17.                 waiter.callback(None)
  18.         return result
  19.  
  20.     def add(self, d):
  21.         d.addBoth(self._fired, d)
  22.         self._pool.add(d)
  23.         return d
  24.  
  25.     def deferUntilEmpty(self, testImmediately=True):
  26.         if testImmediately and not self._pool:
  27.             return defer.succeed(None)
  28.         else:
  29.             d = defer.Deferred()
  30.             self._waiting.append(d)
  31.             return d
  32.  
  33.  
  34. class QueueStopped(Exception):
  35.     pass
  36.  
  37.  
  38. class ResizableDispatchQueue(object):
  39.  
  40.     _sentinel = object()
  41.    
  42.     def __init__(self, func):
  43.         self._queue = defer.DeferredQueue()
  44.         self._func = func
  45.         self._pool = DeferredPool()
  46.         self._coop = task.Cooperator()
  47.         self._currentWidth = 0
  48.         self._pendingStops = 0
  49.         self._stopped = False
  50.  
  51.     def put(self, obj):
  52.         if self._stopped:
  53.             raise QueueStopped()
  54.         self._queue.put(obj)
  55.        
  56.     def pending(self):
  57.         return list(self._queue.pending)
  58.        
  59.     def stop(self):
  60.         self._stopped = True
  61.         # Flush waiters who can now never get a usable item from the queue.
  62.         while self._queue.waiting:
  63.             self._queue.put(self._sentinel)
  64.         d = self._pool.deferUntilEmpty()
  65.         d.addCallback(lambda _: self.pending())
  66.         return d
  67.    
  68.     def _call(self, obj):
  69.         if not obj is self._sentinel:
  70.             return defer.maybeDeferred(self._func, obj)
  71.    
  72.     def next(self):
  73.         if self._stopped:
  74.             raise StopIteration
  75.         elif self._pendingStops:
  76.             self._pendingStops -= 1
  77.             self._currentWidth -= 1
  78.             raise StopIteration
  79.         else:
  80.             d = self._queue.get()
  81.             d.addCallback(self._call)
  82.             return d
  83.  
  84.     def narrow(self, n=1):
  85.         self._setWidth(self.width - n)
  86.  
  87.     def widen(self, n=1):
  88.         self._setWidth(self.width + n)
  89.  
  90.     start = widen
  91.  
  92.     def _getWidth(self):
  93.         return self._currentWidth - self._pendingStops
  94.  
  95.     def _setWidth(self, width):
  96.         targetWidth = self._currentWidth - self._pendingStops
  97.         extra = width - targetWidth
  98.         if extra > 0:
  99.             # Make ourselves wider.
  100.             delta = extra - self._pendingStops
  101.             if delta >= 0:
  102.                 self._pendingStops = 0
  103.                 for i in xrange(delta):
  104.                     self._pool.add(self._coop.coiterate(self))
  105.                 self._currentWidth += delta
  106.             else:
  107.                 self._pendingStops -= extra
  108.         elif extra < 0:
  109.             # Make ourselves narrower.
  110.             self._pendingStops -= extra
  111.  
  112.     width = property(_getWidth, _setWidth)
  113.  
  114.     def setWidth(self, width):
  115.         self.width = width
  116.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement