import collections
import concurrent.futures
import types
import threading
_tls = threading.local()
class CallableContext:
'''Represents a context, such as a thread or process, in which callables
may be executed. An awaiter context differs from a thread or process pool
in that execution always occurs within the context regardless of the
source of the callable.
A common usage is to ensure that `concurrent.futures.Future` objects invoke
their callback in the same context it was started from::
executor = ThreadPoolExecutor(max_workers=5)
future = executor.submit(perform_calculation)
# Must capture `context` outside of the handler
context = awaiters.CallableContext.get_current()
future.add_done_handler(lambda f: context.submit(print, f.result()))
If no context has been set up, `get_current` will return None.
A context may be manually set up by using a subclass of `CallableContext`
such as `ThreadContext`::
context = awaiters.ThreadContext()
def main():
# This method is within the context of the current thread
ctxt = awaiters.CallableContext.get_current()
# ctxt is context == True when called from this thread.
f = executor.submit(lambda: awaiters.CallableContext.get_current())
# f.result() is None == true when called from another thread
context.run(main)
'''
@staticmethod
def get_current():
'''Returns the current callable context if one exists. This should be
captured and passed to an asynchronous operation so that it can use the
`submit` or `post` methods to execute code in the original context.
If this returns ``None``, there is no context available.
'''
try:
context = _tls.current_context
except AttributeError:
_tls.current_context = context = CallableContext()
return context
@staticmethod
def set_current(context):
'''Sets the current context and returns the previous one.'''
old_context = CallableContext.get_current()
_tls.current_context = context
return old_context
def __enter__(self):
self.__previous_context = CallableContext.set_current(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
CallableContext.set_current(self.__previous_context)
self.__previous_context = None
def submit(self, callable, *args, **kwargs):
'''Adds a callable to invoke within a context. This method does not
return until the callable has been executed. Any exceptions will be
raised in the calling context.
This method returns the value returned by the callable.
The default implementation executes the callable immediately.
'''
return callable(*args, **kwargs)
def post(self, callable, *args, **kwargs):
'''Adds a callable to invoke within a context. This method returns
immediately and any exceptions will be raised in the target context.
This method has no return value.
The default implementation executes the callable immediately.
'''
callable(*args, **kwargs)
class _Awaiter:
'''Implements the callback behavior of functions wrapped with `async`.
'''
def __init__(self, generator, final_future, target_context=None):
self.generator = generator
self.final_future = final_future
self.target_context = target_context
if self.final_future.set_running_or_notify_cancel():
self._step(None)
def __call__(self, prev_future):
if not self.target_context or self.target_context is CallableContext.get_current():
return self._step(prev_future)
self.target_context.post(self._step, prev_future)
def _step(self, prev_future):
if prev_future:
ex = prev_future.exception()
if ex:
self.final_future.set_exception(ex)
return
result = prev_future.result()
else:
result = None
try:
next_future = self.generator.send(result)
next_future.add_done_callback(self)
except StopIteration as si:
try:
result = si.args[0]
except IndexError:
result = None
self.final_future.set_result(result)
except BaseException as ex:
self.final_future.set_exception(ex)
class _AsyncCallable:
def __init__(self, fn, synchronized):
self.fn = fn
self.synchronized = synchronized
def __call__(self, *args, **kwargs):
final_future = concurrent.futures.Future()
if not (isinstance(self.fn, types.FunctionType) and self.fn.__code__.co_flags & 0x20):
# Not a generator
final_future.set_result(self.fn(*args, **kwargs))
elif self.synchronized:
# Continue in the original context
_Awaiter(self.fn(*args, **kwargs), final_future, CallableContext.get_current())
else:
# Continue in any context
_Awaiter(self.fn(*args, **kwargs), final_future, None)
return final_future
def __get__(self, inst, ctx):
return types.MethodType(self, inst)
def async(synchronized=True):
'''Decorator to wrap a generator as an asynchronous function returning a
`concurrent.futures.Future` object.
When called, the generator will execute up to the first yield statement.
The yielded value must be a `concurrent.futures.Future` object, which may
be obtained from any source. When the future completes, its result is sent
into the generator. For example::
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
executor = ThreadPoolExecutor(max_workers=5)
def load_url(url):
return urlopen(_url).read()
@async
def get_image_async(url):
buffer = yield executor.submit(load_url, url)
return Image(buffer)
def main(image_uri):
img_future = get_image_async(image_uri)
# perform other tasks while the image is downloading
img = img_future.result()
main("http://www.python.org/images/python-logo.gif")
If `synchronized` is set to True, each step of the generator occurs in the
same `CallableContext` as the function was originally called.
'''
if synchronized is True:
return lambda fn: _AsyncCallable(fn, True)
elif synchronized is False:
return lambda fn: _AsyncCallable(fn, False)
else:
return _AsyncCallable(synchronized, True)