Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package bgu.spl.a2;
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.Map;
- import java.util.Queue;
- import java.util.concurrent.atomic.*;
- /**
- * represents an actor thread pool - to understand what this class does please
- * refer to your assignment.
- *
- * Note for implementors: you may add methods and synchronize any of the
- * existing methods in this class *BUT* you must be able to explain why the
- * synchronization is needed. In addition, the methods you add can only be
- * private, protected or package protected - in other words, no new public
- * methods
- */
- public class ActorThreadPool {
- //we keep three maps, each map key is the actor Id
- //the first map keeps the queue of actions for each actor
- //the second map keeps the private state of each actor
- //the third map keeps atomic boolean variable which represents if the actor's queue is currently taken.
- private HashMap<String,Queue<Action<?>>> actorsQueues;
- private HashMap<String,PrivateState> actorsByPs;
- private HashMap<String,AtomicBoolean> actorIsTaken;
- private LinkedList<String> actorsList;
- private int numOfThreads;
- private Thread[] workers;
- private boolean hasStarted = false;
- private VersionMonitor monitor;
- private AtomicInteger counter;
- /**
- * creates a {@link ActorThreadPool} which has nthreads. Note, threads
- * should not get started until calling to the {@link #start()} method.
- *
- * Implementors note: you may not add other constructors to this class nor
- * you allowed to add any other parameter to this constructor - changing
- * this may cause automatic tests to fail..
- *
- * @param nthreads
- * the number of threads that should be started by this thread
- * pool
- */
- public ActorThreadPool(int nthreads) {
- // TODO: replace method body with real implementation
- this.numOfThreads=nthreads;
- workers= new Thread[numOfThreads];
- actorsQueues=new HashMap<String,Queue<Action<?>>>();
- actorsByPs = new HashMap<String,PrivateState>();
- actorIsTaken = new HashMap<String,AtomicBoolean>();
- actorsList= new LinkedList<>();
- monitor = new VersionMonitor();
- counter.set(0);
- }
- /**
- * getter for actors
- * @return actors
- */
- public Map<String, PrivateState> getActors(){
- // TODO: replace method body with real implementation
- return actorsByPs;
- }
- /**
- * getter for actor's private state
- * @param actorId actor's id
- * @return actor's private state
- */
- public PrivateState getPrivateState(String actorId){
- // TODO: replace method body with real implementation
- return actorsByPs.get(actorId);
- }
- /**
- * submits an action into an actor to be executed by a thread belongs to
- * this thread pool
- *
- * @param action
- * the action to execute
- * @param actorId
- * corresponding actor's id
- * @param actorState
- * actor's private state (actor's information)
- */
- public synchronized void submit(Action<?> action, String actorId, PrivateState actorState) {
- // TODO: replace method body with real implementation
- if (actorsByPs.containsKey(actorId)){
- actorsQueues.get(actorId).add(action);
- }
- else{
- Queue<Action<?>> toInsert=new LinkedList<Action<?>>();
- toInsert.add(action);
- actorsQueues.put(actorId,toInsert);
- actorsByPs.put(actorId, actorState);
- actorIsTaken.put(actorId,new AtomicBoolean());
- actorsList.add(actorId);
- }
- monitor.inc();
- }
- /**
- * closes the thread pool - this method interrupts all the threads and waits
- * for them to stop - it is returns *only* when there are no live threads in
- * the queue.
- *
- * after calling this method - one should not use the queue anymore.
- *
- * @throws InterruptedException
- * if the thread that shut down the threads is interrupted
- */
- public void shutdown() throws InterruptedException {
- // TODO: replace method body with real implementation
- hasStarted=false;
- }
- /**
- * start the threads belongs to this thread pool
- */
- public void start() {
- // TODO: replace method body with real implementation
- hasStarted=true;
- /**each thread is going to get the next mission:
- while no one used the shutdown action,
- if the counter(which counts how many actions has been implemented until now)
- is lower than the monitorVersion(which represents how many actions has been inserted)
- look over the actors queue's for actions to implement
- if the counter has reached the monitor version, it means the threads implemented all the actions given
- and now they will sleep until a new action is inserted to the pool
- **/
- for (int i=0;i<workers.length;i++){
- workers[i]=new Thread(()->{
- while(hasStarted){
- while(counter.get()<monitor.getVersion()){
- for(int j=0;j<actorsList.size();j++){
- String currentActorId = actorsList.get(j);
- if (!(actorsQueues.get(currentActorId).isEmpty()))
- if ((actorIsTaken.get(currentActorId).equals(false))){
- if(actorIsTaken.get(currentActorId).compareAndSet(false, true)){
- Action<?> toExec = actorsQueues.get(actorsList.get(j)).remove();
- toExec.handle(this , actorsList.get(j), actorsByPs.get(actorsList.get(j)));
- counter.incrementAndGet();
- actorIsTaken.get(currentActorId).compareAndSet(true, false);
- }
- }
- }
- }//while2
- try {
- monitor.await(monitor.getVersion());
- } catch (Exception e) {
- }
- }//while1
- });
- }//for
- for(int i=0;i<workers.length;i++)
- workers[i].start();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement