Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from threading import Event, Thread, Lock
- from Queue import Queue
- from reentrantbarrier import Barrier
- """
- This module represents a device.
- Computer Systems Architecture Course
- Assignment 1
- March 2019
- """
- class Device(object):
- """
- Class that represents a device.
- """
- def __init__(self, device_id, sensor_data, supervisor):
- """
- Constructor.
- @type device_id: Integer
- @param device_id: the unique id of this node; between 0 and N-1
- @type sensor_data: List of (Integer, Float)
- @param sensor_data: a list containing (location, data) as measured by this device
- @type supervisor: Supervisor
- @param supervisor: the testing infrastructure's control and validation component
- """
- self.device_id = device_id
- self.sensor_data = sensor_data
- self.supervisor = supervisor
- self.script_received = Event()
- self.scripts = []
- self.timepoint_done = Event()
- self.location_locks = {} ##
- self.barrier = None ##
- self.ready_to_get_script = False ##
- self.all_devices = None ##
- self.thread = DeviceThread(self)
- self.thread.start()
- def __str__(self):
- """
- Pretty prints this device.
- @rtype: String
- @return: a string containing the id of this device
- """
- return "Device %d" % self.device_id
- def set_barrier(self, barrier):
- """
- Setter for barrier
- @type barrier: Barrier
- """
- self.barrier = barrier
- def broadcast_barrier(self, devices):
- """
- id 0 uses this to spread the barrier to everyone else
- #spreadthelove
- """
- for device in devices:
- if device.device_id != 0:
- device.set_barrier(self.barrier)
- def setup_devices(self, devices):
- """
- Setup the devices before simulation begins.
- @type devices: List of Device
- @param devices: list containing all devices
- """
- # we DO need setup
- self.all_devices = devices
- if self.device_id == 0:
- self.barrier = Barrier(len(devices))
- self.broadcast_barrier(devices)
- def assign_script(self, script, location):
- """
- Provide a script for the device to execute.
- @type script: Script
- @param script: the script to execute from now on at each timepoint; None if the
- current timepoint has ended
- @type location: Integer
- @param location: the location for which the script is interested in
- """
- if script is not None:
- #create lock for said location
- if self.location_locks.setdefault(location, None) is None:
- self.location_locks[location] = Lock()
- #a script is recieved so set flag to True
- self.ready_to_get_script = True
- #copy the lock in ALL the other devices so they can take the lock
- for device_no in xrange(len(self.all_devices)):
- self.all_devices[device_no].location_locks[location] = self.location_locks[location]
- self.scripts.append((script, location))
- self.script_received.set()
- else:
- self.timepoint_done.set()
- def get_data(self, location):
- """
- Returns the pollution value this device has for the given location.
- @type location: Integer
- @param location: a location for which obtain the data
- @rtype: Float
- @return: the pollution value
- """
- return self.sensor_data[location] if location in self.sensor_data else None
- def set_data(self, location, data):
- """
- Sets the pollution value stored by this device for the given location.
- @type location: Integer
- @param location: a location for which to set the data
- @type data: Float
- @param data: the pollution value
- """
- if location in self.sensor_data:
- self.sensor_data[location] = data
- def shutdown(self):
- """
- Instructs the device to shutdown (terminate all threads). This method
- is invoked by the tester. This method must block until all the threads
- started by this device terminate.
- """
- self.thread.join()
- class DeviceThread(Thread):
- """
- Class that implements the device's worker thread.
- """
- def __init__(self, device):
- """
- Constructor.
- @type device: Device
- @param device: the device which owns this thread
- """
- Thread.__init__(self, name="Device Thread %d" % device.device_id)
- self.device = device
- self.thread_count = 8
- self.pool = Queue(self.thread_count)
- self.threads = []
- self.create_workers(self.thread_count)
- self.start_workers()
- def create_workers(self, count):
- """
- Creates count threads and sets the target of thread.start()
- to the run function implemented in this class
- @type count: Integer
- """
- for _ in xrange(self.thread_count):
- self.threads.append(Thread(target=self.execute_script))
- def start_workers(self):
- """
- Start threads
- """
- for thread in self.threads:
- thread.start()
- def execute_script(self):
- """
- Execute scripts
- """
- #get first job
- neighbours, script, location = self.pool.get()
- while True:
- #if the job is None thet you're done and you can go get drunk instead
- if neighbours is None and script is None and location is None:
- self.pool.task_done()
- break
- #copy-pasta from old __main__--------------------------
- script_data = []
- self.device.location_locks[location].acquire()
- #critical section --------------------
- # collect data from current neighbours
- for device in neighbours:
- if device.device_id != self.device.device_id:
- data = device.get_data(location)
- if data is not None:
- script_data.append(data)
- # add our data, if any
- data = self.device.get_data(location)
- if data is not None:
- script_data.append(data)
- if script_data != []:
- # run script on data
- result = script.run(script_data)
- # update data of neighbours, hope no one is updating at the same time
- for device in neighbours:
- device.set_data(location, result)
- # update our data, hope no one is updating at the same time
- self.device.set_data(location, result)
- #end of critical section-------------------------
- self.device.location_locks[location].release()
- #---------------------------------------------------
- #notify queue that we're done with this task. blocks otherwise
- self.pool.task_done()
- neighbours, script, location = self.pool.get()
- def run(self):
- while True:
- # get the current neighbourhood
- neighbours = self.device.supervisor.get_neighbours()
- if neighbours is None:
- break
- #for all timpoints
- while True:
- self.device.timepoint_done.wait()
- #done fot this timepoint, i'm ready to get scripts again
- if not self.device.ready_to_get_script:
- self.device.timepoint_done.clear()
- self.device.ready_to_get_script = True
- break
- #put new script in pool if ready
- else:
- for (script, location) in self.device.scripts:
- self.pool.put((neighbours, script, location))
- self.device.ready_to_get_script = False
- #wait all threads
- self.pool.join()
- self.device.barrier.wait()
- #End my life
- self.pool.join()
- for _ in xrange(self.thread_count):
- self.pool.put((None, None, None))
- for thread in self.threads:
- thread.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement