Advertisement
Guest User

Untitled

a guest
Dec 13th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.77 KB | None | 0 0
  1. package edu.duke.raft;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.LinkedList;
  5. import java.util.Timer;
  6.  
  7. // QUESTIONS:
  8. // How often to send append requests?
  9. // How often to check for append requests?
  10. // Do we automatically increment leader term if a log entry is higher? ( i think yes... is there a case where this is bad?)
  11.  
  12. public class LeaderMode extends RaftMode {
  13.  
  14. final private int HEART_TIMER_ID = 3;
  15. final private int APPEND_POLL_ID = 4;
  16. private Timer appendTimer;
  17. private int POLL_TIME = 30;
  18. private Timer heartTimer;
  19. private int[] nextIndex;
  20. private boolean[] serverLogMatched;
  21. private int prevLogTerm;
  22. private int prevLogIndex;
  23. private ArrayList<LinkedList<Entry>> entryIndex;
  24.  
  25. public void go () {
  26. synchronized (mLock) {
  27.  
  28. // Initialize necessary data
  29. int term = mConfig.getCurrentTerm();
  30. prevLogTerm = mLog.getLastTerm();
  31. prevLogIndex = mLog.getLastIndex();
  32. int numServers = mConfig.getNumServers();
  33. System.out.println ("S" + mID + "." + term + ": switched to leader mode.");
  34.  
  35. // Initialize entries
  36. serverLogMatched = new boolean[numServers + 1];
  37. nextIndex = new int[numServers + 1];
  38. entryIndex = new ArrayList<LinkedList<Entry>>();
  39. for (int i = 0; i < numServers + 1; i++) {
  40. entryIndex.add(new LinkedList<Entry>());
  41. nextIndex[i] = prevLogIndex + 1;
  42. }
  43. for (int i = 1; i <= numServers; i++) {
  44. serverLogMatched[i] = false;
  45. if (i == mID) {
  46. serverLogMatched[i] = true;
  47. }
  48. }
  49.  
  50. //FIXME If log entry term is initialized to be higher than our leader term, do we automatically increment the leader term?
  51. Entry lastEntry = mLog.getEntry(prevLogIndex);
  52. if (lastEntry.term >= term) {
  53. System.out.println ("S" + mID + "." + term + ": term incremented to match log -> " + (lastEntry.term + 1));
  54. mConfig.setCurrentTerm(lastEntry.term + 1, mID);
  55. term = mConfig.getCurrentTerm();
  56. }
  57.  
  58. //XXX First should be heartbeat, if failed response, then decrement
  59. // lastEntryList.addFirst(lastEntry);
  60.  
  61.  
  62. // Initialize RaftResponses
  63. RaftResponses.setTerm(term);
  64. RaftResponses.clearAppendResponses(term);
  65.  
  66. // Send out last entry to everyone
  67. // Schedule timer to poll appendResponses
  68. for (int i = 1; i <= numServers; i++) {
  69. if (!serverLogMatched[i]) {
  70. Entry[] entryToSend = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  71. remoteAppendEntries(i, term, mID, prevLogIndex, prevLogTerm, entryToSend, mCommitIndex);
  72. System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  73. }
  74. }
  75. //schedule timer
  76. appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  77. heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  78.  
  79.  
  80. // Send heartbeat
  81. // System.out.println("S" + mID + "." + term + ": sent out heartbeat");
  82. // for (int i = 1; i <= mConfig.getNumServers(); i++) {
  83. // remoteAppendEntries(i, term, mID, lastIndex, lastTerm, null, mCommitIndex);
  84. // }
  85. // System.out.println("heartbeat timer set to " + HEARTBEAT_INTERVAL);
  86. // heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  87.  
  88. }
  89. }
  90.  
  91. // @param candidate’s term
  92. // @param candidate requesting vote
  93. // @param index of candidate’s last log entry
  94. // @param term of candidate’s last log entry
  95. // @return 0, if server votes for candidate; otherwise, server's
  96. // current term
  97. public int requestVote (int candidateTerm,
  98. int candidateID,
  99. int lastLogIndex,
  100. int lastLogTerm) {
  101. synchronized (mLock) {
  102. int term = mConfig.getCurrentTerm ();
  103. int vote = 0;
  104.  
  105.  
  106. System.out.println("S" + mID + "." + term + "(leader): got vote request from " + "S" + candidateID + "." + candidateTerm);
  107.  
  108. // If greater, vote immediately and switch to followerMode
  109. // 1. Their term must be greater to recognize them
  110. // 2. Other's last log term must be >= to vote for them
  111. // 3. Last Log Index must be >= to vote for them
  112. if (candidateTerm > term) {
  113. //Checking that last lost term for the candidate is higher than the server's
  114. //If higher, vote for candidate
  115. //else if, check that last log index is higher or equal
  116.  
  117. if (lastLogTerm < mLog.getLastTerm()) {
  118. System.out.println("I am on last log term");
  119. System.out.println("S" + mID + "." + term + ": did not cast log term vote for S" + candidateID + "." + lastLogTerm);
  120. vote = term;
  121. } else if (lastLogTerm == mLog.getLastTerm() && lastLogIndex < mLog.getLastIndex()){
  122. System.out.println("I am on last log index");
  123. System.out.println("S" + mID + "." + term + ": did not cast log index vote for S" + candidateID + "." + lastLogIndex);
  124. vote = term;
  125. }
  126.  
  127. // Is candidate qualified?
  128. if (vote == term) {
  129. mConfig.setCurrentTerm(candidateTerm, mID);
  130. term = mConfig.getCurrentTerm();
  131. vote = term;
  132. System.out.println("S" + mID + "." + term + ": casted vote for self");
  133. } else {
  134. mConfig.setCurrentTerm(candidateTerm, candidateID);
  135. term = mConfig.getCurrentTerm();
  136. System.out.println("S" + mID + "." + term + ": casted vote for S" + candidateID + "." + candidateTerm);
  137. appendTimer.cancel();
  138. heartTimer.cancel();
  139. RaftServerImpl.setMode(new FollowerMode());
  140. }
  141. return vote;
  142. }
  143. return term;
  144. }
  145. }
  146.  
  147.  
  148. // @param leader’s term
  149. // @param current leader
  150. // @param index of log entry before entries to append
  151. // @param term of log entry before entries to append
  152. // @param entries to append (in order of 0 to append.length-1)
  153. // @param index of highest committed entry
  154. // @return 0, if server appended entries; otherwise, server's
  155. // current term
  156. public int appendEntries (int leaderTerm,
  157. int leaderID,
  158. int prevLogIndex,
  159. int prevLogTerm,
  160. Entry[] entries,
  161. int leaderCommit) {
  162. synchronized (mLock) {
  163. int term = mConfig.getCurrentTerm ();
  164. int result = term;
  165.  
  166. // TODO: if receiving an append request from higher term, switch to follower mode
  167. System.out.println("S" + mID + "." + term + " (leader): append request received");
  168. if (leaderTerm >= term) {
  169.  
  170. // update term if higher
  171. if (leaderTerm > term) {
  172. mConfig.setCurrentTerm(leaderTerm, leaderID);
  173. }
  174. // cancel all timers
  175. appendTimer.cancel();
  176. heartTimer.cancel();
  177.  
  178. //switch to follower
  179. RaftServerImpl.setMode(new FollowerMode());
  180. }
  181.  
  182. // return -1 because otherwise leader will think you did something, when you haven't
  183. return -1;
  184. }
  185. }
  186.  
  187. // @param id of the timer that timed out
  188. public void handleTimeout (int timerID) {
  189. synchronized (mLock) {
  190.  
  191. // If no append responses for a while, send an empty one.
  192. if (timerID == HEART_TIMER_ID) {
  193. int term = mConfig.getCurrentTerm();
  194. for (int i = 1; i <= mConfig.getNumServers(); i++) {
  195. if(serverLogMatched[i] && i != mID){
  196. System.out.println("S" + mID + "." + term + ": sent out heartbeat to S" + i);
  197. remoteAppendEntries(i, term, mID, prevLogIndex, prevLogTerm, entryIndex.get(i).toArray(new Entry[0]), mCommitIndex);
  198. }
  199. }
  200. heartTimer.cancel();
  201. heartTimer = scheduleTimer(HEARTBEAT_INTERVAL, HEART_TIMER_ID);
  202.  
  203. //polling the append responses
  204. } else if (timerID == APPEND_POLL_ID) {
  205. int term = mConfig.getCurrentTerm();
  206. int[] appendResponses = RaftResponses.getAppendResponses(term);
  207. if (appendResponses == null) {
  208. System.err.println("APPEND RESPONSES IS NULL IN LEADER");
  209. appendTimer.cancel();
  210. appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  211. return;
  212. }
  213. for (int i = 1; i <= mConfig.getNumServers(); i++) {
  214. //XXX I don't think this needs to be here.
  215. // if(!updated[i]){
  216. // if rejected, decrease appendIndex of the server, set it back to default, add an earlier entry
  217.  
  218. System.out.println("AR = " + Arrays.toString(appendResponses));
  219. if (appendResponses[i] > 0) {
  220.  
  221. serverLogMatched[i] = false;
  222. // decrease nextIndex of server
  223. System.out.println("nextIndex[" + i + "] updated to " + (nextIndex[i]-1));
  224. nextIndex[i] -= 1;
  225.  
  226. // update the Entry requestfor that server
  227. entryIndex.get(i).addFirst(mLog.getEntry(nextIndex[i]));
  228. System.out.println("S" + mID + "." + term + " (leader): updated entryIndex["+ i + "] = " + entryIndex.get(i));
  229.  
  230.  
  231. //set back to default
  232. RaftResponses.setAppendResponse(i, -1, term);
  233.  
  234.  
  235. // append request with new entries list, one index back.
  236. // prevTerm
  237. Entry[] updatedEntries = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  238. int updatedTerm = nextIndex[i] == 0 ? 0 : mLog.getEntry(nextIndex[i] - 1).term;
  239. remoteAppendEntries(i, term, mID, nextIndex[i] - 1, updatedTerm, updatedEntries, mCommitIndex);
  240. System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  241. } else if (appendResponses[i] == 0) {
  242. // if was successfully updated, update the boolean array, empty their entry list
  243. serverLogMatched[i] = true;
  244. nextIndex[i] = prevLogIndex;
  245. entryIndex.get(i).clear();
  246. } else {
  247. // Response is -1, Nothing has happened, server isn't responding maybe, so send request again
  248. if (!serverLogMatched[i]) {
  249. Entry[] updatedEntries = entryIndex.get(i).toArray(new Entry[entryIndex.get(i).size()]);
  250. int updatedTerm = nextIndex[i] == 0 ? 0 : mLog.getEntry(nextIndex[i] - 1).term;
  251. remoteAppendEntries(i, term, mID, nextIndex[i] - 1, updatedTerm, updatedEntries, mCommitIndex);
  252. System.out.println("S" + mID + "." + term + ": sent out append request to S" + i);
  253. }
  254. }
  255. // }
  256. }
  257. appendTimer.cancel();
  258. appendTimer = scheduleTimer(POLL_TIME, APPEND_POLL_ID);
  259. }
  260. }
  261. }
  262. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement