Advertisement
Guest User

Untitled

a guest
Feb 26th, 2020
201
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 14.12 KB | None | 0 0
  1. package raft
  2.  
  3. //
  4. // this is an outline of the API that raft must expose to
  5. // the service (or tester). see comments below for
  6. // each of these functions for more details.
  7. //
  8. // rf = Make(...)
  9. //   create a new Raft server.
  10. // rf.Start(command interface{}) (index, term, isleader)
  11. //   start agreement on a new log entry
  12. // rf.GetState() (term, isLeader)
  13. //   ask a Raft for its current term, and whether it thinks it is leader
  14. // ApplyMsg
  15. //   each time a new entry is committed to the log, each Raft peer
  16. //   should send an ApplyMsg to the service (or tester)
  17. //   in the same server.
  18. //
  19.  
  20. import (
  21.     "math/rand"
  22.     "sync"
  23.     "sync/atomic"
  24.     "time"
  25.  
  26.     "../labrpc"
  27. )
  28.  
  29. const (
  30.     Follower                = 0
  31.     Candidate               = 1
  32.     Leader                  = 2
  33.     MinElectionTimeout      = 500
  34.     MaxElectionTimeout      = 750
  35.     MaxHeartBeatsPerSecond  = 10
  36.     MillisecondToNanoSecond = 1000000
  37. )
  38.  
  39. type ServerStatus int
  40.  
  41. // import "bytes"
  42. // import "../labgob"
  43.  
  44. //
  45. // as each Raft peer becomes aware that successive log entries are
  46. // committed, the peer should send an ApplyMsg to the service (or
  47. // tester) on the same server, via the applyCh passed to Make(). set
  48. // CommandValid to true to indicate that the ApplyMsg contains a newly
  49. // committed log entry.
  50. //
  51. // in Lab 3 you'll want to send other kinds of messages (e.g.,
  52. // snapshots) on the applyCh; at that point you can add fields to
  53. // ApplyMsg, but set CommandValid to false for these other uses.
  54. //
  55. type ApplyMsg struct {
  56.     CommandValid bool
  57.     Command      interface{}
  58.     CommandIndex int
  59. }
  60.  
  61. // log entries; each entry contains command
  62. // for state machine, and term when entry
  63. // was received by leader (first index is 1)
  64. type LogEntry struct {
  65.     Command     interface{}
  66.     CommandTerm int // term when entry was received by leader
  67. }
  68.  
  69. //
  70. // A Go object implementing a single Raft peer.
  71. //
  72. type Raft struct {
  73.     mu        sync.Mutex          // Lock to protect shared access to this peer's state
  74.     peers     []*labrpc.ClientEnd // RPC end points of all peers
  75.     persister *Persister          // Object to hold this peer's persisted state
  76.     me        int                 // this peer's index into peers[]
  77.     dead      int32               // set by Kill()
  78.  
  79.     // Persistant state
  80.     currentTerm int
  81.     votedFor    int
  82.     logEntries  []LogEntry
  83.  
  84.     // Volatile state on all servers
  85.     commitIndex int
  86.     lastApplied int
  87.  
  88.     // Volatile state only if this server is a leader
  89.     nextIndex  []int
  90.     matchIndex []int
  91.  
  92.     serverStatus ServerStatus // Either leader, follower or candidate
  93.  
  94.     nextElectionTime time.Time
  95.  
  96.     // Your data here (2A, 2B, 2C).
  97.     // Look at the paper's Figure 2 for a description of what
  98.     // state a Raft server must maintain.
  99.  
  100. }
  101.  
  102. // return currentTerm and whether this server
  103. // believes it is the leader.
  104. func (rf *Raft) GetState() (int, bool) {
  105.  
  106.     var term int
  107.     var isleader bool
  108.     rf.mu.Lock()
  109.     term = rf.currentTerm
  110.     isleader = rf.serverStatus == Leader
  111.     rf.mu.Unlock()
  112.  
  113.     // Your code here (2A).
  114.     return term, isleader
  115. }
  116.  
  117. //
  118. // save Raft's persistent state to stable storage,
  119. // where it can later be retrieved after a crash and restart.
  120. // see paper's Figure 2 for a description of what should be persistent.
  121. //
  122. func (rf *Raft) persist() {
  123.     // Your code here (2C).
  124.     // Example:
  125.     // w := new(bytes.Buffer)
  126.     // e := labgob.NewEncoder(w)
  127.     // e.Encode(rf.xxx)
  128.     // e.Encode(rf.yyy)
  129.     // data := w.Bytes()
  130.     // rf.persister.SaveRaftState(data)
  131. }
  132.  
  133. //
  134. // restore previously persisted state.
  135. //
  136. func (rf *Raft) readPersist(data []byte) {
  137.     if data == nil || len(data) < 1 { // bootstrap without any state?
  138.         return
  139.     }
  140.     // Your code here (2C).
  141.     // Example:
  142.     // r := bytes.NewBuffer(data)
  143.     // d := labgob.NewDecoder(r)
  144.     // var xxx
  145.     // var yyy
  146.     // if d.Decode(&xxx) != nil ||
  147.     //    d.Decode(&yyy) != nil {
  148.     //   error...
  149.     // } else {
  150.     //   rf.xxx = xxx
  151.     //   rf.yyy = yyy
  152.     // }
  153. }
  154.  
  155. type AppendEntriesArgs struct {
  156.     Term         int // Leaders term
  157.     ID           int // leader ID
  158.     PrevLogIndex int // Index of log entry immedietly preceding the new ones
  159.     PrevLogTerm  int // Term of the PrevLogIndex entry
  160.     Entries      []LogEntry
  161.     LeaderCommit int // Leader's commit index
  162. }
  163.  
  164. type AppendEntriesReply struct {
  165.     Term    int  // current term for leader to update itself
  166.     Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
  167. }
  168.  
  169. func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  170.     rf.mu.Lock()
  171.     if args.Term > rf.currentTerm {
  172.         rf.currentTerm = args.Term
  173.         rf.serverStatus = Follower
  174.         rf.votedFor = -1
  175.     }
  176.     // Assuming everything goes well, before network partition and this is just heartbeat:
  177.     if len(args.Entries) == 0 {
  178.         // heartbeat
  179.         // Reset election timeout here
  180.         rf.SetNextElectionTime()
  181.         // TODO: Confirm this use of election term to demote a leader
  182.     }
  183.     reply.Term = rf.currentTerm // TODO: Try this at the top
  184.     rf.mu.Unlock()
  185.     // TODO: reply.Success =
  186. }
  187.  
  188. //
  189. // example RequestVote RPC arguments structure.
  190. // field names must start with capital letters!
  191. //
  192. type RequestVoteArgs struct {
  193.     Term         int // Candidates term
  194.     ID           int // ID of candidate requesting vote
  195.     LastLogIndex int // Index of candidate's last log entry
  196.     LastLogTerm  int // Term of candidates last log entry
  197.  
  198.     // Your data here (2A, 2B).
  199. }
  200.  
  201. //
  202. // example RequestVote RPC reply structure.
  203. // field names must start with capital letters!
  204. //
  205. type RequestVoteReply struct {
  206.     // Your data here (2A).
  207.     Term        int // currentTerm for candidate to update itself
  208.     VoteGranted bool
  209. }
  210.  
  211. //
  212. // example RequestVote RPC handler.
  213. //
  214. func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
  215.     // Your code here (2A, 2B).
  216.     rf.mu.Lock()
  217.     defer rf.mu.Unlock()
  218.     if args.Term < rf.currentTerm {
  219.         reply.VoteGranted = false
  220.     } else if args.Term > rf.currentTerm {
  221.         reply.VoteGranted = true
  222.         rf.votedFor = args.ID
  223.         rf.currentTerm = args.Term
  224.         rf.serverStatus = Follower
  225.     } else {
  226.         if rf.votedFor == -1 || rf.votedFor == args.ID {
  227.             reply.VoteGranted = true
  228.         } else {
  229.             reply.VoteGranted = false
  230.         }
  231.     }
  232.     reply.Term = rf.currentTerm
  233. }
  234.  
  235. //
  236. // example code to send a RequestVote RPC to a server.
  237. // server is the index of the target server in rf.peers[].
  238. // expects RPC arguments in args.
  239. // fills in *reply with RPC reply, so caller should
  240. // pass &reply.
  241. // the types of the args and reply passed to Call() must be
  242. // the same as the types of the arguments declared in the
  243. // handler function (including whether they are pointers).
  244. //
  245. // The labrpc package simulates a lossy network, in which servers
  246. // may be unreachable, and in which requests and replies may be lost.
  247. // Call() sends a request and waits for a reply. If a reply arrives
  248. // within a timeout interval, Call() returns true; otherwise
  249. // Call() returns false. Thus Call() may not return for a while.
  250. // A false return can be caused by a dead server, a live server that
  251. // can't be reached, a lost request, or a lost reply.
  252. //
  253. // Call() is guaranteed to return (perhaps after a delay) *except* if the
  254. // handler function on the server side does not return.  Thus there
  255. // is no need to implement your own timeouts around Call().
  256. //
  257. // look at the comments in ../labrpc/labrpc.go for more details.
  258. //
  259. // if you're having trouble getting RPC to work, check that you've
  260. // capitalized all field names in structs passed over RPC, and
  261. // that the caller passes the address of the reply struct with &, not
  262. // the struct itself.
  263. //
  264. func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
  265.     ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
  266.     return ok
  267. }
  268.  
  269. func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
  270.     ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
  271.     return ok
  272. }
  273.  
  274. //
  275. // the service using Raft (e.g. a k/v server) wants to start
  276. // agreement on the next command to be appended to Raft's log. if this
  277. // server isn't the leader, returns false. otherwise start the
  278. // agreement and return immediately. there is no guarantee that this
  279. // command will ever be committed to the Raft log, since the leader
  280. // may fail or lose an election. even if the Raft instance has been killed,
  281. // this function should return gracefully.
  282. //
  283. // the first return value is the index that the command will appear at
  284. // if it's ever committed. the second return value is the current
  285. // term. the third return value is true if this server believes it is
  286. // the leader.
  287. //
  288. func (rf *Raft) Start(command interface{}) (int, int, bool) {
  289.     index := -1
  290.     term := -1
  291.     isLeader := true
  292.  
  293.     // Your code here (2B).
  294.  
  295.     return index, term, isLeader
  296. }
  297.  
  298. //
  299. // the tester calls Kill() when a Raft instance won't
  300. // be needed again. for your convenience, we supply
  301. // code to set rf.dead (without needing a lock),
  302. // and a killed() method to test rf.dead in
  303. // long-running loops. you can also add your own
  304. // code to Kill(). you're not required to do anything
  305. // about this, but it may be convenient (for example)
  306. // to suppress debug output from a Kill()ed instance.
  307. //
  308. func (rf *Raft) Kill() {
  309.     atomic.StoreInt32(&rf.dead, 1)
  310.     // Your code here, if desired.
  311. }
  312.  
  313. func (rf *Raft) killed() bool {
  314.     z := atomic.LoadInt32(&rf.dead)
  315.     return z == 1
  316. }
  317.  
  318. // Returns a random number from 300 ms - 500 ms to be
  319. // used as an election timeout.
  320. func (rf *Raft) SetNextElectionTime() {
  321.     rand.Seed(time.Now().UnixNano())
  322.     var sleepDuration time.Duration = time.Duration((MinElectionTimeout + rand.Intn(MaxElectionTimeout-MinElectionTimeout)) * MillisecondToNanoSecond)
  323.     rf.nextElectionTime = time.Now().Add(sleepDuration)
  324. }
  325.  
  326. //
  327. // the service or tester wants to create a Raft server. the ports
  328. // of all the Raft servers (including this one) are in peers[]. this
  329. // server's port is peers[me]. all the servers' peers[] arrays
  330. // have the same order. persister is a place for this server to
  331. // save its persistent state, and also initially holds the most
  332. // recent saved state, if any. applyCh is a channel on which the
  333. // tester or service expects Raft to send ApplyMsg messages.
  334. // Make() must return quickly, so it should start goroutines
  335. // for any long-running work.
  336. //
  337. func Make(peers []*labrpc.ClientEnd, me int,
  338.     persister *Persister, applyCh chan ApplyMsg) *Raft {
  339.     rf := &Raft{}
  340.     rf.mu.Lock()
  341.     rf.peers = peers
  342.     rf.persister = persister
  343.     rf.me = me
  344.  
  345.     rf.serverStatus = Follower
  346.     rf.SetNextElectionTime()
  347.  
  348.     // Your initialization code here (2A, 2B, 2C).
  349.  
  350.     // initialize from state persisted before a crash
  351.     rf.readPersist(persister.ReadRaftState())
  352.     rf.mu.Unlock()
  353.  
  354.     go rf.AdvanceElectionTimeout()
  355.  
  356.     return rf
  357. }
  358.  
  359. func (rf *Raft) AdvanceElectionTimeout() {
  360.     for {
  361.         rf.mu.Lock()
  362.         if !time.Now().Before(rf.nextElectionTime) {
  363.             // fmt.Printf("Become candidate %v\n", rf.me)
  364.             rf.mu.Unlock()
  365.             selected := rf.BeginElection() // try to become a leader
  366.             if selected {
  367.                 // fmt.Printf("Leader %v at term %v", rf.me, rf.currentTerm)
  368.                 rf.BecomeLeader()
  369.             } else {
  370.                 rf.mu.Lock()
  371.                 rf.serverStatus = Follower
  372.                 rf.mu.Unlock()
  373.             }
  374.         } else {
  375.             // If election time is ahead, sleep until election time
  376.             if rf.nextElectionTime.After(time.Now()) {
  377.                 sleepDuration := rf.nextElectionTime.Sub(time.Now())
  378.                 rf.mu.Unlock()
  379.                 time.Sleep(sleepDuration)
  380.             } else {
  381.                 rf.mu.Unlock()
  382.             }
  383.         }
  384.     }
  385. }
  386.  
  387. func (rf *Raft) BecomeLeader() {
  388.     rf.mu.Lock()
  389.     rf.serverStatus = Leader
  390.     rf.mu.Unlock()
  391.     rf.SendHeartBeats()
  392. }
  393.  
  394. func (rf *Raft) SendHeartBeats() {
  395.     for {
  396.         rf.mu.Lock()
  397.         if rf.serverStatus == Leader {
  398.             currentTerm := rf.currentTerm
  399.             rf.mu.Unlock()
  400.             for i := range rf.peers {
  401.                 go func(server int) {
  402.                     if server != rf.me {
  403.                         args := AppendEntriesArgs{currentTerm, rf.me, 0, 0, []LogEntry{}, 0} // TODO: See how to fix values initialized as 0
  404.                         reply := AppendEntriesReply{}
  405.                         ok := rf.peers[server].Call("Raft.AppendEntries", &args, &reply)
  406.                         if ok {
  407.                             rf.mu.Lock()
  408.                             if reply.Term > rf.currentTerm {
  409.                                 rf.currentTerm = reply.Term
  410.                                 rf.serverStatus = Follower
  411.                                 rf.votedFor = -1
  412.                             }
  413.                             rf.mu.Unlock()
  414.                         }
  415.                     }
  416.                 }(i)
  417.             }
  418.         } else {
  419.             rf.mu.Unlock()
  420.             break
  421.         }
  422.         sleepTime := (time.Second / MaxHeartBeatsPerSecond)
  423.         time.Sleep(sleepTime)
  424.     }
  425. }
  426.  
  427. func (rf *Raft) BeginElection() bool {
  428.     var mutex = &sync.Mutex{}
  429.  
  430.     rf.mu.Lock()
  431.     rf.currentTerm++
  432.     rf.serverStatus = Candidate
  433.     // Vote for itself here
  434.  
  435.     mutex.Lock()
  436.     yesVotesCount := 1
  437.     recievedVotesCount := 1
  438.     mutex.Unlock()
  439.  
  440.     rf.votedFor = rf.me
  441.     // Reset election term
  442.     rf.SetNextElectionTime()
  443.     nextElectionTime := rf.nextElectionTime
  444.     currentTerm := rf.currentTerm
  445.     rf.mu.Unlock()
  446.  
  447.     requestVoteArgs := RequestVoteArgs{currentTerm, rf.me, 0, 0}
  448.     for peer := range rf.peers {
  449.         if peer != rf.me {
  450.             go func(server int) {
  451.                 requestVoteReply := RequestVoteReply{}
  452.                 ok := rf.sendRequestVote(server, &requestVoteArgs, &requestVoteReply)
  453.  
  454.                 if ok {
  455.                     mutex.Lock()
  456.                     recievedVotesCount++
  457.                     if requestVoteReply.VoteGranted {
  458.                         yesVotesCount++
  459.                     }
  460.                     if requestVoteReply.Term > currentTerm {
  461.                         rf.mu.Lock()
  462.                         rf.currentTerm = requestVoteReply.Term
  463.                         rf.mu.Unlock()
  464.                     }
  465.                     mutex.Unlock()
  466.                 }
  467.             }(peer)
  468.         }
  469.     }
  470.  
  471.     for {
  472.         if !time.Now().Before(nextElectionTime) {
  473.             // fmt.Printf("Election has timed out in %v\n", rf.me)
  474.             return false
  475.         }
  476.         // If there is a winner or we have recieved all the votes we need
  477.         mutex.Lock()
  478.         if yesVotesCount > (len(rf.peers)/2) || recievedVotesCount == len(rf.peers) {
  479.             if !time.Now().Before(nextElectionTime) {
  480.                 // fmt.Printf("Election has timed out in %v\n", rf.me)
  481.                 return false
  482.             }
  483.  
  484.             if yesVotesCount > (len(rf.peers)/2) && rf.nextElectionTime == nextElectionTime && time.Now().Before(nextElectionTime) && rf.currentTerm == currentTerm {
  485.                 // fmt.Printf("Yes votes count %v and no of peers %v\n", yesVotesCount, len(rf.peers)/2)
  486.                 return true
  487.             }
  488.  
  489.             if yesVotesCount < len(rf.peers)/2 {
  490.                 // fmt.Println("votes not enough")
  491.                 return false
  492.             }
  493.             return false
  494.         }
  495.         time.Sleep(time.Millisecond)
  496.         mutex.Unlock()
  497.     }
  498. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement