SHARE
TWEET

Untitled

a guest Dec 13th, 2018 58 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package edu.duke.raft;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.LinkedList;
  5. import java.util.Timer;
  6.  
  7. // QUESTIONS:
  8. // How often to send append requests?
  9. // How often to check for append requests?
  10. // Do we automatically increment leader term if a log entry is higher? ( i think yes... is there a case where this is bad?)
  11.  
  12. public class LeaderMode extends RaftMode {
  13.  
  14.     final private int HEART_TIMER_ID = 3;
  15.     final private int APPEND_POLL_ID = 4;
  16.     private Timer appendTimer;
  17.     private int POLL_TIME = 30;
  18.     private Timer heartTimer;
  19.     private int[] nextIndex;
  20.     private boolean[] serverLogMatched;
  21.     private int prevLogTerm;
  22.     private int prevLogIndex;
  23.     private ArrayList<LinkedList<Entry>> entryIndex;
  24.  
  25.     public void go () {
  26.         synchronized (mLock) {
  27.  
  28.             // Initialize necessary data
  29.             int term = mConfig.getCurrentTerm();
  30.             prevLogTerm = mLog.getLastTerm();
  31.             prevLogIndex = mLog.getLastIndex();
  32.             int numServers = mConfig.getNumServers();
  33.             System.out.println ("S" + mID + "." + term + ": switched to leader mode.");
  34.            
  35.             // Initialize entries
  36.             serverLogMatched = new boolean[numServers + 1];
  37.             nextIndex = new int[numServers + 1];
  38.             entryIndex = new ArrayList<LinkedList<Entry>>();
  39.             for (int i = 0; i < numServers + 1; i++) {
  40.                 entryIndex.add(new LinkedList<Entry>());
  41.                 nextIndex[i] = prevLogIndex + 1;
  42.             }
  43.             for (int i = 1; i <= numServers; i++) {
  44.                 serverLogMatched[i] = false;
  45.                 if (i == mID) {
  46.                     serverLogMatched[i] = true;
  47.                 }
  48.             }
  49.  
  50.             //FIXME If log entry term is initialized to be higher than our leader term, do we automatically increment the leader term?
  51.             Entry lastEntry = mLog.getEntry(prevLogIndex);
  52.             if (lastEntry.term >= term) {
  53.                 System.out.println ("S" + mID + "." + term + ": term incremented to match log -> " + (lastEntry.term + 1));
  54.                 mConfig.setCurrentTerm(lastEntry.term + 1, mID);
  55.                 term = mConfig.getCurrentTerm();
  56.             }
  57.            
  58.             //XXX First should be heartbeat, if failed response, then decrement
  59. //          lastEntryList.addFirst(lastEntry);
  60.            
  61.            
  62.             // Initialize RaftResponses
  63.             RaftResponses.setTerm(term);
  64.             RaftResponses.clearAppendResponses(term);
  65.  
  66.             // Send out last entry to everyone
  67.             // Schedule timer to poll appendResponses
  68.             for (int i = 1; i <= numServers; i++) {
  69.                 if (!serverLogMatched[i]) {
  70.                     Entry[] entryToSend = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  71.                     remoteAppendEntries(i, term, mID, prevLogIndex, prevLogTerm, entryToSend, mCommitIndex);
  72.                     System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  73.                 }
  74.             }
  75.             //schedule timer
  76.             appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  77.             heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  78.  
  79.  
  80.             // Send heartbeat
  81.             //      System.out.println("S" + mID + "." + term + ": sent out heartbeat");
  82.             //      for (int i = 1; i <= mConfig.getNumServers(); i++) {
  83.             //          remoteAppendEntries(i, term, mID, lastIndex, lastTerm, null, mCommitIndex);
  84.             //      }
  85.             //      System.out.println("heartbeat timer set to " + HEARTBEAT_INTERVAL);
  86.             //      heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  87.  
  88.         }
  89.     }
  90.  
  91.     // @param candidate’s term
  92.     // @param candidate requesting vote
  93.     // @param index of candidate’s last log entry
  94.     // @param term of candidate’s last log entry
  95.     // @return 0, if server votes for candidate; otherwise, server's
  96.     // current term
  97.     public int requestVote (int candidateTerm,
  98.             int candidateID,
  99.             int lastLogIndex,
  100.             int lastLogTerm) {
  101.         synchronized (mLock) {
  102.             int term = mConfig.getCurrentTerm ();
  103.             int vote = 0;
  104.  
  105.  
  106.             System.out.println("S" + mID + "." + term + "(leader): got vote request from " + "S" + candidateID + "." + candidateTerm);
  107.            
  108.             // If greater, vote immediately and switch to followerMode
  109.             // 1. Their term must be greater to recognize them
  110.             // 2. Other's last log term must be >= to vote for them
  111.             // 3. Last Log Index must be >= to vote for them
  112.             if (candidateTerm > term) {
  113.                 //Checking that last lost term for the candidate is higher than the server's
  114.                 //If higher, vote for candidate
  115.                 //else if, check that last log index is higher or equal
  116.                
  117.                 if (lastLogTerm < mLog.getLastTerm()) {
  118.                     System.out.println("I am on last log term");
  119.                     System.out.println("S" + mID + "." + term + ": did not cast log term vote for S" + candidateID + "." + lastLogTerm);
  120.                     vote = term;
  121.                 } else if (lastLogTerm == mLog.getLastTerm() && lastLogIndex < mLog.getLastIndex()){
  122.                     System.out.println("I am on last log index");
  123.                     System.out.println("S" + mID + "." + term + ": did not cast log index vote for S" + candidateID + "." + lastLogIndex);
  124.                     vote = term;
  125.                 }
  126.                
  127.                 // Is candidate qualified?
  128.                 if (vote == term) {
  129.                     mConfig.setCurrentTerm(candidateTerm, mID);
  130.                     term = mConfig.getCurrentTerm();
  131.                     vote = term;
  132.                     System.out.println("S" + mID + "." + term + ": casted vote for self");
  133.                 } else {
  134.                     mConfig.setCurrentTerm(candidateTerm, candidateID);
  135.                     term = mConfig.getCurrentTerm();
  136.                     System.out.println("S" + mID + "." + term + ": casted vote for S" + candidateID + "." + candidateTerm);
  137.                     appendTimer.cancel();
  138.                     heartTimer.cancel();
  139.                     RaftServerImpl.setMode(new FollowerMode());
  140.                 }
  141.                 return vote;
  142.             }
  143.             return term;
  144.         }
  145.     }
  146.  
  147.  
  148.     // @param leader’s term
  149.     // @param current leader
  150.     // @param index of log entry before entries to append
  151.     // @param term of log entry before entries to append
  152.     // @param entries to append (in order of 0 to append.length-1)
  153.     // @param index of highest committed entry
  154.     // @return 0, if server appended entries; otherwise, server's
  155.     // current term
  156.     public int appendEntries (int leaderTerm,
  157.             int leaderID,
  158.             int prevLogIndex,
  159.             int prevLogTerm,
  160.             Entry[] entries,
  161.             int leaderCommit) {
  162.         synchronized (mLock) {
  163.             int term = mConfig.getCurrentTerm ();
  164.             int result = term;
  165.  
  166.             // TODO: if receiving an append request from higher term, switch to follower mode
  167.             System.out.println("S" + mID + "." + term + " (leader): append request received");
  168.             if (leaderTerm >= term) {
  169.  
  170.                 // update term if higher
  171.                 if (leaderTerm > term) {
  172.                     mConfig.setCurrentTerm(leaderTerm, leaderID);          
  173.                 }
  174.                 // cancel all timers
  175.                 appendTimer.cancel();
  176.                 heartTimer.cancel();
  177.  
  178.                 //switch to follower
  179.                 RaftServerImpl.setMode(new FollowerMode());
  180.             }
  181.            
  182.             // return -1 because otherwise leader will think you did something, when you haven't
  183.             return -1;
  184.         }
  185.     }
  186.  
  187.     // @param id of the timer that timed out
  188.     public void handleTimeout (int timerID) {
  189.         synchronized (mLock) {
  190.  
  191.             // If no append responses for a while, send an empty one.
  192.             if (timerID == HEART_TIMER_ID) {
  193.                 int term = mConfig.getCurrentTerm();
  194.                 for (int i = 1; i <= mConfig.getNumServers(); i++) {
  195.                     if(serverLogMatched[i] && i != mID){
  196.                         System.out.println("S" + mID + "." + term + ": sent out heartbeat to S" + i);
  197.                         remoteAppendEntries(i, term, mID, prevLogIndex, prevLogTerm, entryIndex.get(i).toArray(new Entry[0]), mCommitIndex);
  198.                     }
  199.                 }
  200.                 heartTimer.cancel();
  201.                 heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  202.  
  203.                 //polling the append responses
  204.             } else if (timerID == APPEND_POLL_ID) {
  205.                 int term = mConfig.getCurrentTerm();
  206.                 int[] appendResponses = RaftResponses.getAppendResponses(term);
  207.                 if (appendResponses == null) {
  208.                     System.err.println("APPEND RESPONSES IS NULL IN LEADER");
  209.                     appendTimer.cancel();
  210.                     appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  211.                     return;
  212.                 }
  213.                 for (int i = 1; i <= mConfig.getNumServers(); i++) {
  214.                     //XXX I don't think this needs to be here.
  215. //                  if(!updated[i]){
  216.                         // if rejected, decrease appendIndex of the server, set it back to default, add an earlier entry
  217.                    
  218.                     System.out.println("AR = " + Arrays.toString(appendResponses));
  219.                         if (appendResponses[i] > 0) {
  220.                            
  221.                             serverLogMatched[i] = false;
  222.                             // decrease nextIndex of server
  223.                             System.out.println("nextIndex[" + i + "] updated to " + (nextIndex[i]-1));
  224.                             nextIndex[i] -= 1;
  225.  
  226.                             // update the Entry requestfor that server
  227.                             entryIndex.get(i).addFirst(mLog.getEntry(nextIndex[i]));
  228.                             System.out.println("S" + mID + "." + term + " (leader): updated entryIndex["+ i + "] = " +  entryIndex.get(i));
  229.                            
  230.                            
  231.                             //set back to default
  232.                             RaftResponses.setAppendResponse(i, -1, term);
  233.  
  234.  
  235.                             // append request with new entries list, one index back.
  236.                             // prevTerm
  237.                             Entry[] updatedEntries = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  238.                             int updatedTerm = nextIndex[i] == 0 ? 0 : mLog.getEntry(nextIndex[i] - 1).term;
  239.                             remoteAppendEntries(i, term, mID, nextIndex[i] - 1, updatedTerm, updatedEntries, mCommitIndex);
  240.                             System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  241.                         } else if (appendResponses[i] == 0) {
  242.                             // if was successfully updated, update the boolean array, empty their entry list
  243.                             serverLogMatched[i] = true;
  244.                             nextIndex[i] = prevLogIndex;
  245.                             entryIndex.get(i).clear();
  246.                         } else {
  247.                             // Response is -1, Nothing has happened, server isn't responding maybe, so send request again
  248.                             if (!serverLogMatched[i]) {
  249.                                 Entry[] updatedEntries = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  250.                                 int updatedTerm = nextIndex[i] == 0 ? 0 : mLog.getEntry(nextIndex[i] - 1).term;
  251.                                 remoteAppendEntries(i, term, mID, nextIndex[i] - 1, updatedTerm, updatedEntries, mCommitIndex);
  252.                                 System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  253.                             }
  254.                         }
  255. //                  }
  256.                 }
  257.                 appendTimer.cancel();
  258.                 appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  259.             }
  260.         }
  261.     }
  262. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top