Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import threading
- import queue
- def call_threadily(func, *args, **kwargs):
- result = queue.Queue(1)
- def f():
- try:
- result.put(("success", func(*args, **kwargs)))
- except Exception as e:
- result.put(("failure", e))
- t = threading.Thread(target=f)
- t.start()
- t.join()
- assert result.full()
- state, value = result.get()
- if state == "success":
- return value
- else:
- raise Exception("Thread raised an exception!") from value
- def oops():
- return 1/0
- print(call_threadily(oops))
- """
- result:
- Traceback (most recent call last):
- File "C:\Users\Kevin\Desktop\test.py", line 7, in f
- result.put(("success", func(*args, **kwargs)))
- File "C:\Users\Kevin\Desktop\test.py", line 22, in oops
- return 1/0
- ZeroDivisionError: division by zero
- The above exception was the direct cause of the following exception:
- Traceback (most recent call last):
- File "C:\Users\Kevin\Desktop\test.py", line 24, in <module>
- print(call_threadily(oops))
- File "C:\Users\Kevin\Desktop\test.py", line 19, in call_threadily
- raise Exception("Thread raised an exception!") from value
- Exception: Thread raised an exception!
- """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement