Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import sys
- try:
- import queue
- except ImportError:
- import Queue as queue
- import Pyro4.core
- import threading
- import time
- Pyro4.config.SERIALIZER = 'pickle'
- Pyro4.config.SERIALIZERS_ACCEPTED.add('pickle')
- Pyro4.config.COMMTIMEOUT = 30
- class Master(object):
- def __init__(self, other, role):
- self.client_registered = {}
- self.workqueue = queue.Queue()
- self.items_in_work = []
- self.resultqueue = {}
- self.other = other
- self.role = role
- self.lock = threading.Lock()
- self.cnt = 0
- @Pyro4.expose
- def registerClient(self):
- self.lock.acquire()
- self.resultqueue[self.cnt] = queue.Queue()
- self.cnt += 1
- new_id = self.cnt - 1
- self.lock.release()
- print("Client registered, id:", new_id)
- return new_id
- @Pyro4.expose
- def putWorkReserved(self, item, id):
- item.id = id
- self.workqueue.put(item)
- @Pyro4.expose
- def putWork(self, item, id):
- try:
- self.other.putWorkReserved(item, id)
- except:
- pass
- item.id = id
- self.workqueue.put(item)
- @Pyro4.expose
- def getWorkReserved(self, timeout = 5):
- item = self.workqueue.get(timeout = timeout)
- item.start = time.time()
- self.items_in_work.append(item)
- return item
- @Pyro4.expose
- def getWork(self, timeout = 5):
- item = self.workqueue.get(timeout = timeout)
- item.start = time.time()
- self.items_in_work.append(item)
- try:
- self.other.getWorkReserved(timeout)
- except:
- pass
- return item
- @Pyro4.expose
- def putResultReserved(self, item):
- print("Finished in", time.time() - item.start, "seconds")
- self.resultqueue[item.id].put(item)
- @Pyro4.expose
- def putResult(self, item):
- print("Finished in", time.time() - item.start, "seconds")
- self.resultqueue[item.id].put(item)
- try:
- self.other.putResultReserved(item)
- except:
- pass
- @Pyro4.expose
- def getResultReserved(self, id, timeout = 5):
- return self.resultqueue[id].get(timeout = timeout)
- @Pyro4.expose
- def getResult(self, id, timeout = 5):
- # try:
- # self.other.getResultReserved(item, id, timeout)
- # except:
- # pass
- return self.resultqueue[id].get(timeout = timeout)
- @Pyro4.expose
- def workQueueSize(self):
- return self.workqueue.qsize()
- @Pyro4.expose
- def test(self):
- return True
- @Pyro4.expose
- def check(self):
- if self.role == 1:
- return True
- else:
- try:
- self.other.test()
- return False
- except:
- self.role = 1
- return True
- def main():
- # HOST:PORT
- address1 = str(sys.argv[1]).split(':')
- host = address1[0]
- port = int(address1[1])
- address2 = str(sys.argv[2])
- role = int(sys.argv[3])
- daemon = Pyro4.core.Daemon(host, port)
- other = Pyro4.core.Proxy("PYRO:master@" + address2)
- master = Master(other, role)
- uri = daemon.register(master, "master")
- print("Master is running: " + str(uri))
- daemon.requestLoop()
- if __name__=="__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement