Advertisement
Guest User

Untitled

a guest
Dec 15th, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.52 KB | None | 0 0
  1. package bgu.spl.a2;
  2.  
  3. import java.util.HashMap;
  4. import java.util.LinkedList;
  5. import java.util.Map;
  6. import java.util.Queue;
  7. import java.util.concurrent.atomic.*;
  8.  
  9. /**
  10.  * represents an actor thread pool - to understand what this class does please
  11.  * refer to your assignment.
  12.  *
  13.  * Note for implementors: you may add methods and synchronize any of the
  14.  * existing methods in this class *BUT* you must be able to explain why the
  15.  * synchronization is needed. In addition, the methods you add can only be
  16.  * private, protected or package protected - in other words, no new public
  17.  * methods
  18.  */
  19. public class ActorThreadPool {
  20.     //we keep three maps, each map key is the actor Id
  21.     //the first map keeps the queue of actions for each actor
  22.     //the second map keeps the private state of each actor
  23.     //the third map keeps atomic boolean variable which represents if the actor's queue is currently taken.
  24.     private HashMap<String,Queue<Action<?>>> actorsQueues;
  25.     private HashMap<String,PrivateState> actorsByPs;
  26.     private HashMap<String,AtomicBoolean> actorIsTaken;
  27.    
  28.     private LinkedList<String> actorsList;
  29.     private int numOfThreads;
  30.     private Thread[] workers;
  31.     private boolean hasStarted = false;
  32.     private VersionMonitor monitor;
  33.     private AtomicInteger counter;
  34.    
  35.  
  36.     /**
  37.      * creates a {@link ActorThreadPool} which has nthreads. Note, threads
  38.      * should not get started until calling to the {@link #start()} method.
  39.      *
  40.      * Implementors note: you may not add other constructors to this class nor
  41.      * you allowed to add any other parameter to this constructor - changing
  42.      * this may cause automatic tests to fail..
  43.      *
  44.      * @param nthreads
  45.      *            the number of threads that should be started by this thread
  46.      *            pool
  47.      */
  48.     public ActorThreadPool(int nthreads) {
  49.         // TODO: replace method body with real implementation
  50.         this.numOfThreads=nthreads;
  51.         workers= new Thread[numOfThreads];
  52.         actorsQueues=new HashMap<String,Queue<Action<?>>>();
  53.         actorsByPs = new HashMap<String,PrivateState>();
  54.         actorIsTaken = new HashMap<String,AtomicBoolean>();
  55.         actorsList= new LinkedList<>();
  56.         monitor = new VersionMonitor();
  57.         counter.set(0);
  58.     }
  59.    
  60.     /**
  61.      * getter for actors
  62.      * @return actors
  63.      */
  64.     public Map<String, PrivateState> getActors(){
  65.         // TODO: replace method body with real implementation
  66.         return actorsByPs;
  67.     }
  68.    
  69.     /**
  70.      * getter for actor's private state
  71.      * @param actorId actor's id
  72.      * @return actor's private state
  73.      */
  74.     public PrivateState getPrivateState(String actorId){
  75.         // TODO: replace method body with real implementation
  76.         return actorsByPs.get(actorId);
  77.     }
  78.  
  79.     /**
  80.      * submits an action into an actor to be executed by a thread belongs to
  81.      * this thread pool
  82.      *
  83.      * @param action
  84.      *            the action to execute
  85.      * @param actorId
  86.      *            corresponding actor's id
  87.      * @param actorState
  88.      *            actor's private state (actor's information)
  89.      */
  90.     public synchronized void submit(Action<?> action, String actorId, PrivateState actorState) {
  91.         // TODO: replace method body with real implementation
  92.         if (actorsByPs.containsKey(actorId)){
  93.             actorsQueues.get(actorId).add(action);
  94.         }
  95.         else{
  96.             Queue<Action<?>> toInsert=new LinkedList<Action<?>>();
  97.             toInsert.add(action);
  98.             actorsQueues.put(actorId,toInsert);
  99.             actorsByPs.put(actorId, actorState);
  100.             actorIsTaken.put(actorId,new AtomicBoolean());
  101.             actorsList.add(actorId);
  102.         }
  103.        
  104.         monitor.inc();
  105.     }
  106.  
  107.     /**
  108.      * closes the thread pool - this method interrupts all the threads and waits
  109.      * for them to stop - it is returns *only* when there are no live threads in
  110.      * the queue.
  111.      *
  112.      * after calling this method - one should not use the queue anymore.
  113.      *
  114.      * @throws InterruptedException
  115.      *             if the thread that shut down the threads is interrupted
  116.      */
  117.     public void shutdown() throws InterruptedException {
  118.         // TODO: replace method body with real implementation
  119.         hasStarted=false;
  120.     }
  121.  
  122.     /**
  123.      * start the threads belongs to this thread pool
  124.      */
  125.     public void start() {
  126.         // TODO: replace method body with real implementation
  127.         hasStarted=true;
  128.        
  129.         /**each thread is going to get the next mission:
  130.         while no one used the shutdown action,
  131.         if the counter(which counts how many actions has been implemented until now)
  132.         is lower than the monitorVersion(which represents how many actions has been inserted)
  133.         look over the actors queue's for actions to implement
  134.         if the counter has reached the monitor version, it means the threads implemented all the actions given
  135.         and now they will sleep until a new action is inserted to the pool
  136.         **/
  137.         for (int i=0;i<workers.length;i++){
  138.             workers[i]=new Thread(()->{
  139.                 while(hasStarted){
  140.                     while(counter.get()<monitor.getVersion()){
  141.                         for(int j=0;j<actorsList.size();j++){
  142.                             String currentActorId = actorsList.get(j);
  143.                                 if (!(actorsQueues.get(currentActorId).isEmpty()))
  144.                                     if ((actorIsTaken.get(currentActorId).equals(false))){
  145.                                         if(actorIsTaken.get(currentActorId).compareAndSet(false, true)){
  146.                                             Action<?> toExec = actorsQueues.get(actorsList.get(j)).remove();
  147.                                             toExec.handle(this , actorsList.get(j), actorsByPs.get(actorsList.get(j)));
  148.                                             counter.incrementAndGet();
  149.                                             actorIsTaken.get(currentActorId).compareAndSet(true, false);
  150.                                         }
  151.                                     }
  152.                             }
  153.                     }//while2
  154.                     try {
  155.                         monitor.await(monitor.getVersion());
  156.                     } catch (Exception e) {
  157.                     }
  158.                 }//while1
  159.             });
  160.         }//for
  161.        
  162.         for(int i=0;i<workers.length;i++)
  163.             workers[i].start();
  164.     }
  165.  
  166. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement