Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import collections
- import concurrent.futures
- import types
- import threading
- _tls = threading.local()
- class CallableContext:
- '''Represents a context, such as a thread or process, in which callables
- may be executed. An awaiter context differs from a thread or process pool
- in that execution always occurs within the context regardless of the
- source of the callable.
- A common usage is to ensure that `concurrent.futures.Future` objects invoke
- their callback in the same context it was started from::
- executor = ThreadPoolExecutor(max_workers=5)
- future = executor.submit(perform_calculation)
- # Must capture `context` outside of the handler
- context = awaiters.CallableContext.get_current()
- future.add_done_handler(lambda f: context.submit(print, f.result()))
- If no context has been set up, `get_current` will return None.
- A context may be manually set up by using a subclass of `CallableContext`
- such as `ThreadContext`::
- context = awaiters.ThreadContext()
- def main():
- # This method is within the context of the current thread
- ctxt = awaiters.CallableContext.get_current()
- # ctxt is context == True when called from this thread.
- f = executor.submit(lambda: awaiters.CallableContext.get_current())
- # f.result() is None == true when called from another thread
- context.run(main)
- '''
- @staticmethod
- def get_current():
- '''Returns the current callable context if one exists. This should be
- captured and passed to an asynchronous operation so that it can use the
- `submit` or `post` methods to execute code in the original context.
- If this returns ``None``, there is no context available.
- '''
- try:
- context = _tls.current_context
- except AttributeError:
- _tls.current_context = context = CallableContext()
- return context
- @staticmethod
- def set_current(context):
- '''Sets the current context and returns the previous one.'''
- old_context = CallableContext.get_current()
- _tls.current_context = context
- return old_context
- def __enter__(self):
- self.__previous_context = CallableContext.set_current(self)
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- CallableContext.set_current(self.__previous_context)
- self.__previous_context = None
- def submit(self, callable, *args, **kwargs):
- '''Adds a callable to invoke within a context. This method does not
- return until the callable has been executed. Any exceptions will be
- raised in the calling context.
- This method returns the value returned by the callable.
- The default implementation executes the callable immediately.
- '''
- return callable(*args, **kwargs)
- def post(self, callable, *args, **kwargs):
- '''Adds a callable to invoke within a context. This method returns
- immediately and any exceptions will be raised in the target context.
- This method has no return value.
- The default implementation executes the callable immediately.
- '''
- callable(*args, **kwargs)
- class _Awaiter:
- '''Implements the callback behavior of functions wrapped with `async`.
- '''
- def __init__(self, generator, final_future, target_context=None):
- self.generator = generator
- self.final_future = final_future
- self.target_context = target_context
- if self.final_future.set_running_or_notify_cancel():
- self._step(None)
- def __call__(self, prev_future):
- if not self.target_context or self.target_context is CallableContext.get_current():
- return self._step(prev_future)
- self.target_context.post(self._step, prev_future)
- def _step(self, prev_future):
- if prev_future:
- ex = prev_future.exception()
- if ex:
- self.final_future.set_exception(ex)
- return
- result = prev_future.result()
- else:
- result = None
- try:
- next_future = self.generator.send(result)
- next_future.add_done_callback(self)
- except StopIteration as si:
- try:
- result = si.args[0]
- except IndexError:
- result = None
- self.final_future.set_result(result)
- except BaseException as ex:
- self.final_future.set_exception(ex)
- class _AsyncCallable:
- def __init__(self, fn, synchronized):
- self.fn = fn
- self.synchronized = synchronized
- def __call__(self, *args, **kwargs):
- final_future = concurrent.futures.Future()
- if not (isinstance(self.fn, types.FunctionType) and self.fn.__code__.co_flags & 0x20):
- # Not a generator
- final_future.set_result(self.fn(*args, **kwargs))
- elif self.synchronized:
- # Continue in the original context
- _Awaiter(self.fn(*args, **kwargs), final_future, CallableContext.get_current())
- else:
- # Continue in any context
- _Awaiter(self.fn(*args, **kwargs), final_future, None)
- return final_future
- def __get__(self, inst, ctx):
- return types.MethodType(self, inst)
- def async(synchronized=True):
- '''Decorator to wrap a generator as an asynchronous function returning a
- `concurrent.futures.Future` object.
- When called, the generator will execute up to the first yield statement.
- The yielded value must be a `concurrent.futures.Future` object, which may
- be obtained from any source. When the future completes, its result is sent
- into the generator. For example::
- from concurrent.futures import ThreadPoolExecutor
- from urllib.request import urlopen
- executor = ThreadPoolExecutor(max_workers=5)
- def load_url(url):
- return urlopen(_url).read()
- @async
- def get_image_async(url):
- buffer = yield executor.submit(load_url, url)
- return Image(buffer)
- def main(image_uri):
- img_future = get_image_async(image_uri)
- # perform other tasks while the image is downloading
- img = img_future.result()
- main("http://www.python.org/images/python-logo.gif")
- If `synchronized` is set to True, each step of the generator occurs in the
- same `CallableContext` as the function was originally called.
- '''
- if synchronized is True:
- return lambda fn: _AsyncCallable(fn, True)
- elif synchronized is False:
- return lambda fn: _AsyncCallable(fn, False)
- else:
- return _AsyncCallable(synchronized, True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement