Advertisement
Guest User

Untitled

a guest
Oct 10th, 2019
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.86 KB | None | 0 0
  1. import contextvars
  2. import functools
  3. from concurrent.futures.thread import ThreadPoolExecutor
  4.  
  5. from aiogram.utils.callback_data import CallbackData
  6.  
  7. import uvloop
  8. from typing import Callable
  9.  
  10. loop = uvloop.new_event_loop()
  11.  
  12. _executor = ThreadPoolExecutor()
  13.  
  14. reaction_cd = CallbackData('rctn', 'r')
  15. settings_cd = CallbackData('settings', 'set')
  16. lang_cd = CallbackData('lang', 'lang')
  17. page_cd = CallbackData('page', 'page')
  18. word_cd = CallbackData('word', 'word')
  19.  
  20.  
  21. def g(name):
  22. print(name)
  23. return name
  24.  
  25.  
  26. def aiowrap(func: Callable) -> object:
  27. @functools.wraps(func)
  28. def wrapping(*args, **kwargs):
  29. new_func = functools.partial(func, *args, **kwargs)
  30. ctx = contextvars.copy_context()
  31. ctx_func = functools.partial(ctx.run, new_func)
  32. return g(loop.run_in_executor(_executor, ctx_func))
  33.  
  34. return wrapping
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement