Advertisement
Guest User

Untitled

a guest
Dec 13th, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.40 KB | None | 0 0
  1. from __future__ import print_function
  2. import sys
  3. try:
  4. import queue
  5. except ImportError:
  6. import Queue as queue
  7. import Pyro4.core
  8. import threading
  9. import time
  10.  
  11. Pyro4.config.SERIALIZER = 'pickle'
  12. Pyro4.config.SERIALIZERS_ACCEPTED.add('pickle')
  13. Pyro4.config.COMMTIMEOUT = 30
  14.  
  15. class Master(object):
  16. def __init__(self, other, role):
  17. self.client_registered = {}
  18. self.workqueue = queue.Queue()
  19. self.items_in_work = []
  20. self.resultqueue = {}
  21. self.other = other
  22. self.role = role
  23. self.lock = threading.Lock()
  24. self.cnt = 0
  25.  
  26. @Pyro4.expose
  27. def registerClient(self):
  28. self.lock.acquire()
  29. self.resultqueue[self.cnt] = queue.Queue()
  30. self.cnt += 1
  31. new_id = self.cnt - 1
  32. self.lock.release()
  33. print("Client registered, id:", new_id)
  34. return new_id
  35.  
  36. @Pyro4.expose
  37. def putWorkReserved(self, item, id):
  38. item.id = id
  39. self.workqueue.put(item)
  40.  
  41. @Pyro4.expose
  42. def putWork(self, item, id):
  43. try:
  44. self.other.putWorkReserved(item, id)
  45. except:
  46. pass
  47. item.id = id
  48. self.workqueue.put(item)
  49.  
  50. @Pyro4.expose
  51. def getWorkReserved(self, timeout = 5):
  52. item = self.workqueue.get(timeout = timeout)
  53. item.start = time.time()
  54. self.items_in_work.append(item)
  55. return item
  56.  
  57. @Pyro4.expose
  58. def getWork(self, timeout = 5):
  59. item = self.workqueue.get(timeout = timeout)
  60. item.start = time.time()
  61. self.items_in_work.append(item)
  62. try:
  63. self.other.getWorkReserved(timeout)
  64. except:
  65. pass
  66. return item
  67.  
  68. @Pyro4.expose
  69. def putResultReserved(self, item):
  70. print("Finished in", time.time() - item.start, "seconds")
  71. self.resultqueue[item.id].put(item)
  72.  
  73. @Pyro4.expose
  74. def putResult(self, item):
  75. print("Finished in", time.time() - item.start, "seconds")
  76. self.resultqueue[item.id].put(item)
  77. try:
  78. self.other.putResultReserved(item)
  79. except:
  80. pass
  81.  
  82. @Pyro4.expose
  83. def getResultReserved(self, id, timeout = 5):
  84. return self.resultqueue[id].get(timeout = timeout)
  85.  
  86. @Pyro4.expose
  87. def getResult(self, id, timeout = 5):
  88. # try:
  89. # self.other.getResultReserved(item, id, timeout)
  90. # except:
  91. # pass
  92. return self.resultqueue[id].get(timeout = timeout)
  93.  
  94. @Pyro4.expose
  95. def workQueueSize(self):
  96. return self.workqueue.qsize()
  97.  
  98. @Pyro4.expose
  99. def test(self):
  100. return True
  101.  
  102. @Pyro4.expose
  103. def check(self):
  104. if self.role == 1:
  105. return True
  106. else:
  107. try:
  108. self.other.test()
  109. return False
  110. except:
  111. self.role = 1
  112. return True
  113.  
  114. def main():
  115. # HOST:PORT
  116. address1 = str(sys.argv[1]).split(':')
  117. host = address1[0]
  118. port = int(address1[1])
  119.  
  120. address2 = str(sys.argv[2])
  121.  
  122. role = int(sys.argv[3])
  123.  
  124. daemon = Pyro4.core.Daemon(host, port)
  125. other = Pyro4.core.Proxy("PYRO:master@" + address2)
  126. master = Master(other, role)
  127.  
  128. uri = daemon.register(master, "master")
  129.  
  130. print("Master is running: " + str(uri))
  131. daemon.requestLoop()
  132.  
  133. if __name__=="__main__":
  134. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement