Advertisement
Guest User

Untitled

a guest
Jun 19th, 2019
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.20 KB | None | 0 0
  1. from PyQt5.QtCore import QObject, QThread, QSemaphore, QReadWriteLock, pyqtSignal, pyqtSlot
  2. import random, time
  3. class Share():
  4. data = dict()
  5. def __init__(self, key, value):
  6. Share.data[key] = value
  7. @staticmethod
  8. def get(key):
  9. return Share.data[key]
  10. @staticmethod
  11. def set(key, value):
  12. if isinstance(value, int):
  13. print("new value:", value)
  14. Share.data[key] = value
  15. else:
  16. raise Exception("Need to be an int")
  17. @staticmethod
  18. def multiply(key, value):
  19. Share.data[key] *= value
  20. class Worker(QThread):
  21. sma = QSemaphore(QThread.idealThreadCount())
  22. mutex = QReadWriteLock()
  23. end = pyqtSignal(Worker)
  24. count = 0
  25. def __init__(self, state, data, key):
  26. super().__init__()
  27. Worker.count += 1
  28. self.number = Worker.count
  29. self.state = state
  30. self.data = data
  31. self.key = key
  32. self.finished.connect(self.__del__)
  33. def __del__(self):
  34. self.quit()
  35. self.deleteLater()
  36. def run(self):
  37. self.sma.acquire()
  38. factor = random.randint(1, 10)
  39. operation = factor if self.state else 1 / factor
  40. wait = random.randint(1,10)
  41. self.mutex.lockForWrite()
  42. print("thread", self.number, "running | operation =>", operation, \
  43. "| threads aviable =>", self.sma.available())
  44. time.sleep(wait)
  45. self.data.multiply(self.key, operation)
  46. print("from", self.number, "data = ", self.data.get(self.key))
  47. self.mutex.unlock()
  48. self.end.emit(self)
  49. class Test(QObject):
  50. def __init__(self, origin):
  51. super().__init__()
  52. self.w = list()
  53. for i in range(2,15):
  54. state = random.randint(0,1)
  55. new_worker = Worker(state, origin, "a")
  56. new_worker.end.connect(self.on_end)
  57. self.w.append(new_worker)
  58. print("there is", len(self.w), "workers running")
  59. self.w[i-2].start()
  60. while len(w) != 0:
  61. time.sleep(1)
  62. print("finally, origin is", origin)
  63. @pyqtSlot(Worker)
  64. def on_end(self, worker):
  65. print("bye bye", worker.number)
  66. del self.w[worker]
  67. origin = Share("a", 1)
  68. ok = Test(origin)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement