Guest User

Untitled

a guest
Apr 20th, 2018
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.78 KB | None | 0 0
  1. import threading
  2. from unix_threading import _pthread_cond
  3.  
  4. class NativeEvent(object):
  5. def __init__(self):
  6. self.__cond = _pthread_cond.Condition()
  7. self.__flag = False
  8.  
  9. def isSet(self):
  10. return self.__flag
  11.  
  12. def set(self):
  13. self.__cond.acquire()
  14. try:
  15. self.__flag = True
  16. self.__cond.notifyAll()
  17. finally:
  18. self.__cond.release()
  19.  
  20. def clear(self):
  21. self.__cond.acquire()
  22. try:
  23. self.__flag = False
  24. finally:
  25. self.__cond.release()
  26.  
  27. def wait(self, timeout=None):
  28. self.__cond.acquire()
  29. try:
  30. if not self.__flag:
  31. self.__cond.wait(timeout)
  32. finally:
  33. self.__cond.release()
Add Comment
Please, Sign In to add comment