Advertisement
Guest User

Python async contexts

a guest
Oct 15th, 2012
157
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.85 KB | None | 0 0
  1. import collections
  2. import concurrent.futures
  3. import types
  4. import threading
  5.  
  6. _tls = threading.local()
  7.  
  8. class CallableContext:
  9.     '''Represents a context, such as a thread or process, in which callables
  10.    may be executed. An awaiter context differs from a thread or process pool
  11.    in that execution always occurs within the context regardless of the
  12.    source of the callable.
  13.  
  14.    A common usage is to ensure that `concurrent.futures.Future` objects invoke
  15.    their callback in the same context it was started from::
  16.        executor = ThreadPoolExecutor(max_workers=5)
  17.        future = executor.submit(perform_calculation)
  18.        
  19.        # Must capture `context` outside of the handler
  20.        context = awaiters.CallableContext.get_current()
  21.        future.add_done_handler(lambda f: context.submit(print, f.result()))
  22.  
  23.    If no context has been set up, `get_current` will return None.
  24.  
  25.    A context may be manually set up by using a subclass of `CallableContext`
  26.    such as `ThreadContext`::
  27.        context = awaiters.ThreadContext()
  28.  
  29.        def main():
  30.            # This method is within the context of the current thread
  31.  
  32.            ctxt = awaiters.CallableContext.get_current()
  33.            # ctxt is context == True when called from this thread.
  34.  
  35.            f = executor.submit(lambda: awaiters.CallableContext.get_current())
  36.            # f.result() is None == true when called from another thread
  37.  
  38.        context.run(main)
  39.    '''
  40.  
  41.     @staticmethod
  42.     def get_current():
  43.         '''Returns the current callable context if one exists. This should be
  44.        captured and passed to an asynchronous operation so that it can use the
  45.        `submit` or `post` methods to execute code in the original context.
  46.  
  47.        If this returns ``None``, there is no context available.
  48.        '''
  49.         try:
  50.             context = _tls.current_context
  51.         except AttributeError:
  52.             _tls.current_context = context = CallableContext()
  53.        
  54.         return context
  55.  
  56.     @staticmethod
  57.     def set_current(context):
  58.         '''Sets the current context and returns the previous one.'''
  59.         old_context = CallableContext.get_current()
  60.         _tls.current_context = context
  61.         return old_context
  62.  
  63.     def __enter__(self):
  64.         self.__previous_context = CallableContext.set_current(self)
  65.         return self
  66.  
  67.     def __exit__(self, exc_type, exc_value, traceback):
  68.         CallableContext.set_current(self.__previous_context)
  69.         self.__previous_context = None
  70.  
  71.     def submit(self, callable, *args, **kwargs):
  72.         '''Adds a callable to invoke within a context. This method does not
  73.        return until the callable has been executed. Any exceptions will be
  74.        raised in the calling context.
  75.  
  76.        This method returns the value returned by the callable.
  77.  
  78.        The default implementation executes the callable immediately.
  79.        '''
  80.         return callable(*args, **kwargs)
  81.  
  82.     def post(self, callable, *args, **kwargs):
  83.         '''Adds a callable to invoke within a context. This method returns
  84.        immediately and any exceptions will be raised in the target context.
  85.  
  86.        This method has no return value.
  87.  
  88.        The default implementation executes the callable immediately.
  89.        '''
  90.         callable(*args, **kwargs)
  91.  
  92.  
  93. class _Awaiter:
  94.     '''Implements the callback behavior of functions wrapped with `async`.
  95.    '''
  96.     def __init__(self, generator, final_future, target_context=None):
  97.         self.generator = generator
  98.         self.final_future = final_future
  99.         self.target_context = target_context
  100.         if self.final_future.set_running_or_notify_cancel():
  101.             self._step(None)
  102.    
  103.     def __call__(self, prev_future):
  104.         if not self.target_context or self.target_context is CallableContext.get_current():
  105.             return self._step(prev_future)
  106.  
  107.         self.target_context.post(self._step, prev_future)
  108.        
  109.     def _step(self, prev_future):
  110.         if prev_future:
  111.             ex = prev_future.exception()
  112.             if ex:
  113.                 self.final_future.set_exception(ex)
  114.                 return
  115.             result = prev_future.result()
  116.         else:
  117.             result = None
  118.  
  119.         try:
  120.             next_future = self.generator.send(result)
  121.             next_future.add_done_callback(self)
  122.         except StopIteration as si:
  123.             try:
  124.                 result = si.args[0]
  125.             except IndexError:
  126.                 result = None
  127.             self.final_future.set_result(result)
  128.         except BaseException as ex:
  129.             self.final_future.set_exception(ex)
  130.  
  131.  
  132. class _AsyncCallable:
  133.     def __init__(self, fn, synchronized):
  134.         self.fn = fn
  135.         self.synchronized = synchronized
  136.    
  137.     def __call__(self, *args, **kwargs):
  138.         final_future = concurrent.futures.Future()
  139.  
  140.         if not (isinstance(self.fn, types.FunctionType) and self.fn.__code__.co_flags & 0x20):
  141.             # Not a generator
  142.             final_future.set_result(self.fn(*args, **kwargs))
  143.         elif self.synchronized:
  144.             # Continue in the original context
  145.             _Awaiter(self.fn(*args, **kwargs), final_future, CallableContext.get_current())
  146.         else:
  147.             # Continue in any context
  148.             _Awaiter(self.fn(*args, **kwargs), final_future, None)
  149.  
  150.         return final_future
  151.        
  152.     def __get__(self, inst, ctx):
  153.         return types.MethodType(self, inst)
  154.  
  155. def async(synchronized=True):
  156.     '''Decorator to wrap a generator as an asynchronous function returning a
  157.    `concurrent.futures.Future` object.
  158.  
  159.    When called, the generator will execute up to the first yield statement.
  160.    The yielded value must be a `concurrent.futures.Future` object, which may
  161.    be obtained from any source. When the future completes, its result is sent
  162.    into the generator. For example::
  163.        from concurrent.futures import ThreadPoolExecutor
  164.        from urllib.request import urlopen
  165.        
  166.        executor = ThreadPoolExecutor(max_workers=5)
  167.  
  168.        def load_url(url):
  169.            return urlopen(_url).read()
  170.  
  171.        @async
  172.        def get_image_async(url):
  173.            buffer = yield executor.submit(load_url, url)
  174.            return Image(buffer)
  175.  
  176.        def main(image_uri):
  177.            img_future = get_image_async(image_uri)
  178.            # perform other tasks while the image is downloading
  179.            img = img_future.result()
  180.  
  181.        main("http://www.python.org/images/python-logo.gif")
  182.  
  183.  
  184.    If `synchronized` is set to True, each step of the generator occurs in the
  185.    same `CallableContext` as the function was originally called.
  186.    '''
  187.     if synchronized is True:
  188.         return lambda fn: _AsyncCallable(fn, True)
  189.     elif synchronized is False:
  190.         return lambda fn: _AsyncCallable(fn, False)
  191.     else:
  192.         return _AsyncCallable(synchronized, True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement