import collections import concurrent.futures import types import threading _tls = threading.local() class CallableContext: '''Represents a context, such as a thread or process, in which callables may be executed. An awaiter context differs from a thread or process pool in that execution always occurs within the context regardless of the source of the callable. A common usage is to ensure that `concurrent.futures.Future` objects invoke their callback in the same context it was started from:: executor = ThreadPoolExecutor(max_workers=5) future = executor.submit(perform_calculation) # Must capture `context` outside of the handler context = awaiters.CallableContext.get_current() future.add_done_handler(lambda f: context.submit(print, f.result())) If no context has been set up, `get_current` will return None. A context may be manually set up by using a subclass of `CallableContext` such as `ThreadContext`:: context = awaiters.ThreadContext() def main(): # This method is within the context of the current thread ctxt = awaiters.CallableContext.get_current() # ctxt is context == True when called from this thread. f = executor.submit(lambda: awaiters.CallableContext.get_current()) # f.result() is None == true when called from another thread context.run(main) ''' @staticmethod def get_current(): '''Returns the current callable context if one exists. This should be captured and passed to an asynchronous operation so that it can use the `submit` or `post` methods to execute code in the original context. If this returns ``None``, there is no context available. ''' try: context = _tls.current_context except AttributeError: _tls.current_context = context = CallableContext() return context @staticmethod def set_current(context): '''Sets the current context and returns the previous one.''' old_context = CallableContext.get_current() _tls.current_context = context return old_context def __enter__(self): self.__previous_context = CallableContext.set_current(self) return self def __exit__(self, exc_type, exc_value, traceback): CallableContext.set_current(self.__previous_context) self.__previous_context = None def submit(self, callable, *args, **kwargs): '''Adds a callable to invoke within a context. This method does not return until the callable has been executed. Any exceptions will be raised in the calling context. This method returns the value returned by the callable. The default implementation executes the callable immediately. ''' return callable(*args, **kwargs) def post(self, callable, *args, **kwargs): '''Adds a callable to invoke within a context. This method returns immediately and any exceptions will be raised in the target context. This method has no return value. The default implementation executes the callable immediately. ''' callable(*args, **kwargs) class _Awaiter: '''Implements the callback behavior of functions wrapped with `async`. ''' def __init__(self, generator, final_future, target_context=None): self.generator = generator self.final_future = final_future self.target_context = target_context if self.final_future.set_running_or_notify_cancel(): self._step(None) def __call__(self, prev_future): if not self.target_context or self.target_context is CallableContext.get_current(): return self._step(prev_future) self.target_context.post(self._step, prev_future) def _step(self, prev_future): if prev_future: ex = prev_future.exception() if ex: self.final_future.set_exception(ex) return result = prev_future.result() else: result = None try: next_future = self.generator.send(result) next_future.add_done_callback(self) except StopIteration as si: try: result = si.args[0] except IndexError: result = None self.final_future.set_result(result) except BaseException as ex: self.final_future.set_exception(ex) class _AsyncCallable: def __init__(self, fn, synchronized): self.fn = fn self.synchronized = synchronized def __call__(self, *args, **kwargs): final_future = concurrent.futures.Future() if not (isinstance(self.fn, types.FunctionType) and self.fn.__code__.co_flags & 0x20): # Not a generator final_future.set_result(self.fn(*args, **kwargs)) elif self.synchronized: # Continue in the original context _Awaiter(self.fn(*args, **kwargs), final_future, CallableContext.get_current()) else: # Continue in any context _Awaiter(self.fn(*args, **kwargs), final_future, None) return final_future def __get__(self, inst, ctx): return types.MethodType(self, inst) def async(synchronized=True): '''Decorator to wrap a generator as an asynchronous function returning a `concurrent.futures.Future` object. When called, the generator will execute up to the first yield statement. The yielded value must be a `concurrent.futures.Future` object, which may be obtained from any source. When the future completes, its result is sent into the generator. For example:: from concurrent.futures import ThreadPoolExecutor from urllib.request import urlopen executor = ThreadPoolExecutor(max_workers=5) def load_url(url): return urlopen(_url).read() @async def get_image_async(url): buffer = yield executor.submit(load_url, url) return Image(buffer) def main(image_uri): img_future = get_image_async(image_uri) # perform other tasks while the image is downloading img = img_future.result() main("http://www.python.org/images/python-logo.gif") If `synchronized` is set to True, each step of the generator occurs in the same `CallableContext` as the function was originally called. ''' if synchronized is True: return lambda fn: _AsyncCallable(fn, True) elif synchronized is False: return lambda fn: _AsyncCallable(fn, False) else: return _AsyncCallable(synchronized, True)