Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import multiprocessing
- from multiprocessing import Pipe, Lock
- from helper import Message
- import time
- import math
- class Worker(multiprocessing.Process):
- # TODO : add desc
- def __init__(self, id, clock, lock, input_pipe, output_pipes, n):
- multiprocessing.Process.__init__(self)
- self.id = id # process id
- self.c = clock # process interior logical clock
- self.lock = lock # shared Lock object for critical snippet execution
- self.input = input_pipe # input pipe end in which we send a message
- self.outputs = output_pipes # all output pipes ends from which we read messages
- self.queue = [] # query of messages for each process
- self.num = n # number of processes
- def run(self):
- """
- Overriden process running method.
- :return: None
- """
- for num in range (0, 2):
- # generate a demand for work
- demand = Message("DEMAND", self.id, self.c)
- for i in range(1, self.num):
- self.input.send(demand)
- time.sleep(0.2)
- # see if we have received any demands from other processes
- for out in self.outputs:
- try:
- msg = out.recv() # Read from the output pipe
- if msg is not None:
- self.queue.append(msg)
- except EOFError:
- break
- time.sleep(0.2)
- # check all DEMANDS, update local logical closk and send RESPONSE messages
- # if their clock value is lower than the local value
- for q in self.queue:
- self.c = max(self.c, q.get_clock()) + 1
- if self.c > q.get_clock():
- response = Message("RESPONSE", self.id, q.get_clock())
- self.input.send(response)
- time.sleep(0.2)
- # collect all RESPONSES and update personal clock self.C
- num_responses = 0
- while num_responses < 1:
- for out in self.outputs:
- try:
- msg = out.recv() # Read from the output pipe
- if msg is not None:
- self.c = max(self.c, msg.get_clock()) + 1
- num_responses += 1
- except EOFError:
- break
- time.sleep(0.2)
- # the dirty work is done here
- self.lock.acquire()
- print ("Process", self.id, ": ulazak u K.O. %s. put." % num)
- time.sleep(2)
- print ("Process", self.id, ": obavljanje posla u K.O.")
- for message in self.queue:
- response = Message("RESPONSE", self.id, message.get_proc_id())
- self.input.send(response)
- self.queue = []
- self.lock.release()
- print ("Proces %s zavrsava sa radom." % self.id)
- if __name__ == '__main__':
- lock = Lock()
- out1, in1 = Pipe()
- out2, in2 = Pipe()
- out3, in3 = Pipe()
- w1 = Worker(1, 12, lock, in1, [out2], 3)
- w2 = Worker(2, 7, lock, in2, [out1], 3)
- w3 = Worker(3, 20, lock, in3, [out1, out2], 3)
- w1.start()
- w2.start()
- w3.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement