Advertisement
Guest User

Untitled

a guest
Mar 25th, 2019
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.42 KB | None | 0 0
  1. from threading import Event, Thread, Lock
  2. from Queue import Queue
  3. from reentrantbarrier import Barrier
  4.  
  5. """
  6. This module represents a device.
  7.  
  8. Computer Systems Architecture Course
  9. Assignment 1
  10. March 2019
  11. """
  12.  
  13. class Device(object):
  14. """
  15. Class that represents a device.
  16. """
  17.  
  18. def __init__(self, device_id, sensor_data, supervisor):
  19. """
  20. Constructor.
  21.  
  22. @type device_id: Integer
  23. @param device_id: the unique id of this node; between 0 and N-1
  24.  
  25. @type sensor_data: List of (Integer, Float)
  26. @param sensor_data: a list containing (location, data) as measured by this device
  27.  
  28. @type supervisor: Supervisor
  29. @param supervisor: the testing infrastructure's control and validation component
  30. """
  31. self.device_id = device_id
  32. self.sensor_data = sensor_data
  33. self.supervisor = supervisor
  34. self.script_received = Event()
  35. self.scripts = []
  36. self.timepoint_done = Event()
  37.  
  38. self.location_locks = {} ##
  39. self.barrier = None ##
  40. self.ready_to_get_script = False ##
  41. self.all_devices = None ##
  42.  
  43. self.thread = DeviceThread(self)
  44. self.thread.start()
  45.  
  46. def __str__(self):
  47. """
  48. Pretty prints this device.
  49.  
  50. @rtype: String
  51. @return: a string containing the id of this device
  52. """
  53. return "Device %d" % self.device_id
  54.  
  55. def set_barrier(self, barrier):
  56. """
  57. Setter for barrier
  58.  
  59. @type barrier: Barrier
  60. """
  61. self.barrier = barrier
  62.  
  63. def broadcast_barrier(self, devices):
  64. """
  65. id 0 uses this to spread the barrier to everyone else
  66. #spreadthelove
  67. """
  68. for device in devices:
  69. if device.device_id != 0:
  70. device.set_barrier(self.barrier)
  71.  
  72.  
  73. def setup_devices(self, devices):
  74. """
  75. Setup the devices before simulation begins.
  76.  
  77. @type devices: List of Device
  78. @param devices: list containing all devices
  79. """
  80. # we DO need setup
  81. self.all_devices = devices
  82. if self.device_id == 0:
  83. self.barrier = Barrier(len(devices))
  84. self.broadcast_barrier(devices)
  85.  
  86. def assign_script(self, script, location):
  87. """
  88. Provide a script for the device to execute.
  89.  
  90. @type script: Script
  91. @param script: the script to execute from now on at each timepoint; None if the
  92. current timepoint has ended
  93.  
  94. @type location: Integer
  95. @param location: the location for which the script is interested in
  96. """
  97. if script is not None:
  98. #create lock for said location
  99. if self.location_locks.setdefault(location, None) is None:
  100. self.location_locks[location] = Lock()
  101. #a script is recieved so set flag to True
  102. self.ready_to_get_script = True
  103.  
  104. #copy the lock in ALL the other devices so they can take the lock
  105. for device_no in xrange(len(self.all_devices)):
  106. self.all_devices[device_no].location_locks[location] = self.location_locks[location]
  107.  
  108. self.scripts.append((script, location))
  109. self.script_received.set()
  110. else:
  111. self.timepoint_done.set()
  112.  
  113. def get_data(self, location):
  114. """
  115. Returns the pollution value this device has for the given location.
  116.  
  117. @type location: Integer
  118. @param location: a location for which obtain the data
  119.  
  120. @rtype: Float
  121. @return: the pollution value
  122. """
  123. return self.sensor_data[location] if location in self.sensor_data else None
  124.  
  125. def set_data(self, location, data):
  126. """
  127. Sets the pollution value stored by this device for the given location.
  128.  
  129. @type location: Integer
  130. @param location: a location for which to set the data
  131.  
  132. @type data: Float
  133. @param data: the pollution value
  134. """
  135. if location in self.sensor_data:
  136. self.sensor_data[location] = data
  137.  
  138. def shutdown(self):
  139. """
  140. Instructs the device to shutdown (terminate all threads). This method
  141. is invoked by the tester. This method must block until all the threads
  142. started by this device terminate.
  143. """
  144. self.thread.join()
  145.  
  146.  
  147. class DeviceThread(Thread):
  148. """
  149. Class that implements the device's worker thread.
  150. """
  151.  
  152. def __init__(self, device):
  153. """
  154. Constructor.
  155.  
  156. @type device: Device
  157. @param device: the device which owns this thread
  158. """
  159. Thread.__init__(self, name="Device Thread %d" % device.device_id)
  160. self.device = device
  161. self.thread_count = 8
  162. self.pool = Queue(self.thread_count)
  163. self.threads = []
  164. self.create_workers(self.thread_count)
  165. self.start_workers()
  166.  
  167. def create_workers(self, count):
  168. """
  169. Creates count threads and sets the target of thread.start()
  170. to the run function implemented in this class
  171.  
  172. @type count: Integer
  173. """
  174. for _ in xrange(self.thread_count):
  175. self.threads.append(Thread(target=self.execute_script))
  176.  
  177. def start_workers(self):
  178. """
  179. Start threads
  180. """
  181. for thread in self.threads:
  182. thread.start()
  183.  
  184. def execute_script(self):
  185. """
  186. Execute scripts
  187. """
  188. #get first job
  189. neighbours, script, location = self.pool.get()
  190.  
  191. while True:
  192. #if the job is None thet you're done and you can go get drunk instead
  193. if neighbours is None and script is None and location is None:
  194. self.pool.task_done()
  195. break
  196.  
  197. #copy-pasta from old __main__--------------------------
  198. script_data = []
  199. self.device.location_locks[location].acquire()
  200. #critical section --------------------
  201. # collect data from current neighbours
  202. for device in neighbours:
  203. if device.device_id != self.device.device_id:
  204. data = device.get_data(location)
  205. if data is not None:
  206. script_data.append(data)
  207. # add our data, if any
  208. data = self.device.get_data(location)
  209. if data is not None:
  210. script_data.append(data)
  211.  
  212. if script_data != []:
  213. # run script on data
  214. result = script.run(script_data)
  215.  
  216. # update data of neighbours, hope no one is updating at the same time
  217. for device in neighbours:
  218. device.set_data(location, result)
  219. # update our data, hope no one is updating at the same time
  220. self.device.set_data(location, result)
  221. #end of critical section-------------------------
  222. self.device.location_locks[location].release()
  223. #---------------------------------------------------
  224. #notify queue that we're done with this task. blocks otherwise
  225. self.pool.task_done()
  226. neighbours, script, location = self.pool.get()
  227.  
  228. def run(self):
  229. while True:
  230. # get the current neighbourhood
  231. neighbours = self.device.supervisor.get_neighbours()
  232. if neighbours is None:
  233. break
  234.  
  235. #for all timpoints
  236. while True:
  237.  
  238. self.device.timepoint_done.wait()
  239.  
  240. #done fot this timepoint, i'm ready to get scripts again
  241. if not self.device.ready_to_get_script:
  242. self.device.timepoint_done.clear()
  243. self.device.ready_to_get_script = True
  244. break
  245.  
  246. #put new script in pool if ready
  247. else:
  248. for (script, location) in self.device.scripts:
  249. self.pool.put((neighbours, script, location))
  250. self.device.ready_to_get_script = False
  251.  
  252. #wait all threads
  253. self.pool.join()
  254. self.device.barrier.wait()
  255.  
  256. #End my life
  257. self.pool.join()
  258. for _ in xrange(self.thread_count):
  259. self.pool.put((None, None, None))
  260. for thread in self.threads:
  261. thread.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement