Advertisement
Guest User

Untitled

a guest
Apr 20th, 2019
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.31 KB | None | 0 0
  1. """Thread handling functions.
  2.  
  3. Sample usage:
  4.  
  5. >>> @threaded
  6. >>> def such_long():
  7. >>> time.sleep(2)
  8. >>> return 'much wiki'
  9.  
  10. >>> @threaded
  11. >>> def much_slow():
  12. >>> time.sleep(1)
  13. >>> return 'such spin'
  14.  
  15. >>> @threaded
  16. >>> def nope():
  17. >>> raise NotImplementedError
  18.  
  19. >>> join(
  20. >>> such_long(),
  21. >>> much_slow()
  22. >>> )
  23. ('much wiki', 'such spin')
  24.  
  25. >>> ops = (such_long, much_slow)
  26. >>> join(op() for op in ops)
  27. ('much wiki', 'such spin')
  28.  
  29. >>> ops = (such_long, much_slow)
  30. >>> join(op() for op in ops)
  31. ('much wiki', 'such spin')
  32.  
  33. >>> join(
  34. >>> such_long(),
  35. >>> nope(),
  36. >>> much_slow()
  37. >>> )
  38. Traceback (most recent call last):
  39. ...
  40. NotImplementedError
  41. """
  42.  
  43. import queue
  44. import threading
  45.  
  46.  
  47. def join(*threads):
  48. """Wait for all passed `@threaded` functions return values.
  49.  
  50. If any exception was raised, it is automatically re-raised. Passed
  51. functions are always joined.
  52. """
  53. exc = None
  54.  
  55. if len(threads) == 0: # pragma: nocover
  56. return []
  57. elif len(threads) == 1:
  58. if hasattr(threads[0], "__iter__"):
  59. threads = list(threads[0])
  60.  
  61. for thread in threads:
  62. thread.join()
  63.  
  64. if not thread.e.empty():
  65. exc = thread.e.get()
  66.  
  67. if exc:
  68. raise exc
  69.  
  70. return [thread.q.get() for thread in threads]
  71.  
  72.  
  73. def threaded(f, daemon=False):
  74. """Declare a function as a thread.
  75.  
  76. This high-level implementation relies on `threading.Thread` but adds 2
  77. queues for safe operations:
  78.  
  79. - `q`: Thread's return value (after it finishes)
  80. - `e`: Thread's raised exception
  81.  
  82. These queues are mutually exclusive and since some operations on queues are
  83. blocking, some time should be spent on the documentation. Alternatively, a
  84. high-level `join()` implementation is provided.
  85. """
  86.  
  87. def wrapped_f(q, e, *args, **kwargs):
  88. try:
  89. q.put(f(*args, **kwargs))
  90. except Exception as exc:
  91. e.put(exc)
  92.  
  93. def wrap(*args, **kwargs):
  94. q = queue.Queue()
  95. e = queue.Queue()
  96.  
  97. t = threading.Thread(
  98. target=wrapped_f, args=(q, e) + args, kwargs=kwargs
  99. )
  100.  
  101. t.daemon = daemon
  102. t.start()
  103.  
  104. t.q = q
  105. t.e = e
  106.  
  107. return t
  108.  
  109. return wrap
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement