Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package raft
- //
- // this is an outline of the API that raft must expose to
- // the service (or tester). see comments below for
- // each of these functions for more details.
- //
- // rf = Make(...)
- // create a new Raft server.
- // rf.Start(command interface{}) (index, term, isleader)
- // start agreement on a new log entry
- // rf.GetState() (term, isLeader)
- // ask a Raft for its current term, and whether it thinks it is leader
- // ApplyMsg
- // each time a new entry is committed to the log, each Raft peer
- // should send an ApplyMsg to the service (or tester)
- // in the same server.
- //
- import (
- "math/rand"
- "sync"
- "sync/atomic"
- "time"
- "../labrpc"
- )
- const (
- Follower = 0
- Candidate = 1
- Leader = 2
- MinElectionTimeout = 500
- MaxElectionTimeout = 750
- MaxHeartBeatsPerSecond = 10
- MillisecondToNanoSecond = 1000000
- )
- type ServerStatus int
- // import "bytes"
- // import "../labgob"
- //
- // as each Raft peer becomes aware that successive log entries are
- // committed, the peer should send an ApplyMsg to the service (or
- // tester) on the same server, via the applyCh passed to Make(). set
- // CommandValid to true to indicate that the ApplyMsg contains a newly
- // committed log entry.
- //
- // in Lab 3 you'll want to send other kinds of messages (e.g.,
- // snapshots) on the applyCh; at that point you can add fields to
- // ApplyMsg, but set CommandValid to false for these other uses.
- //
- type ApplyMsg struct {
- CommandValid bool
- Command interface{}
- CommandIndex int
- }
- // log entries; each entry contains command
- // for state machine, and term when entry
- // was received by leader (first index is 1)
- type LogEntry struct {
- Command interface{}
- CommandTerm int // term when entry was received by leader
- }
- //
- // A Go object implementing a single Raft peer.
- //
- type Raft struct {
- mu sync.Mutex // Lock to protect shared access to this peer's state
- peers []*labrpc.ClientEnd // RPC end points of all peers
- persister *Persister // Object to hold this peer's persisted state
- me int // this peer's index into peers[]
- dead int32 // set by Kill()
- // Persistant state
- currentTerm int
- votedFor int
- logEntries []LogEntry
- // Volatile state on all servers
- commitIndex int
- lastApplied int
- // Volatile state only if this server is a leader
- nextIndex []int
- matchIndex []int
- serverStatus ServerStatus // Either leader, follower or candidate
- nextElectionTime time.Time
- // Your data here (2A, 2B, 2C).
- // Look at the paper's Figure 2 for a description of what
- // state a Raft server must maintain.
- }
- // return currentTerm and whether this server
- // believes it is the leader.
- func (rf *Raft) GetState() (int, bool) {
- var term int
- var isleader bool
- rf.mu.Lock()
- term = rf.currentTerm
- isleader = rf.serverStatus == Leader
- rf.mu.Unlock()
- // Your code here (2A).
- return term, isleader
- }
- //
- // save Raft's persistent state to stable storage,
- // where it can later be retrieved after a crash and restart.
- // see paper's Figure 2 for a description of what should be persistent.
- //
- func (rf *Raft) persist() {
- // Your code here (2C).
- // Example:
- // w := new(bytes.Buffer)
- // e := labgob.NewEncoder(w)
- // e.Encode(rf.xxx)
- // e.Encode(rf.yyy)
- // data := w.Bytes()
- // rf.persister.SaveRaftState(data)
- }
- //
- // restore previously persisted state.
- //
- func (rf *Raft) readPersist(data []byte) {
- if data == nil || len(data) < 1 { // bootstrap without any state?
- return
- }
- // Your code here (2C).
- // Example:
- // r := bytes.NewBuffer(data)
- // d := labgob.NewDecoder(r)
- // var xxx
- // var yyy
- // if d.Decode(&xxx) != nil ||
- // d.Decode(&yyy) != nil {
- // error...
- // } else {
- // rf.xxx = xxx
- // rf.yyy = yyy
- // }
- }
- type AppendEntriesArgs struct {
- Term int // Leaders term
- ID int // leader ID
- PrevLogIndex int // Index of log entry immedietly preceding the new ones
- PrevLogTerm int // Term of the PrevLogIndex entry
- Entries []LogEntry
- LeaderCommit int // Leader's commit index
- }
- type AppendEntriesReply struct {
- Term int // current term for leader to update itself
- Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
- }
- func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
- rf.mu.Lock()
- if args.Term > rf.currentTerm {
- rf.currentTerm = args.Term
- rf.serverStatus = Follower
- rf.votedFor = -1
- }
- // Assuming everything goes well, before network partition and this is just heartbeat:
- if len(args.Entries) == 0 {
- // heartbeat
- // Reset election timeout here
- rf.SetNextElectionTime()
- // TODO: Confirm this use of election term to demote a leader
- }
- reply.Term = rf.currentTerm // TODO: Try this at the top
- rf.mu.Unlock()
- // TODO: reply.Success =
- }
- //
- // example RequestVote RPC arguments structure.
- // field names must start with capital letters!
- //
- type RequestVoteArgs struct {
- Term int // Candidates term
- ID int // ID of candidate requesting vote
- LastLogIndex int // Index of candidate's last log entry
- LastLogTerm int // Term of candidates last log entry
- // Your data here (2A, 2B).
- }
- //
- // example RequestVote RPC reply structure.
- // field names must start with capital letters!
- //
- type RequestVoteReply struct {
- // Your data here (2A).
- Term int // currentTerm for candidate to update itself
- VoteGranted bool
- }
- //
- // example RequestVote RPC handler.
- //
- func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
- // Your code here (2A, 2B).
- rf.mu.Lock()
- defer rf.mu.Unlock()
- if args.Term < rf.currentTerm {
- reply.VoteGranted = false
- } else if args.Term > rf.currentTerm {
- reply.VoteGranted = true
- rf.votedFor = args.ID
- rf.currentTerm = args.Term
- rf.serverStatus = Follower
- } else {
- if rf.votedFor == -1 || rf.votedFor == args.ID {
- reply.VoteGranted = true
- } else {
- reply.VoteGranted = false
- }
- }
- reply.Term = rf.currentTerm
- }
- //
- // example code to send a RequestVote RPC to a server.
- // server is the index of the target server in rf.peers[].
- // expects RPC arguments in args.
- // fills in *reply with RPC reply, so caller should
- // pass &reply.
- // the types of the args and reply passed to Call() must be
- // the same as the types of the arguments declared in the
- // handler function (including whether they are pointers).
- //
- // The labrpc package simulates a lossy network, in which servers
- // may be unreachable, and in which requests and replies may be lost.
- // Call() sends a request and waits for a reply. If a reply arrives
- // within a timeout interval, Call() returns true; otherwise
- // Call() returns false. Thus Call() may not return for a while.
- // A false return can be caused by a dead server, a live server that
- // can't be reached, a lost request, or a lost reply.
- //
- // Call() is guaranteed to return (perhaps after a delay) *except* if the
- // handler function on the server side does not return. Thus there
- // is no need to implement your own timeouts around Call().
- //
- // look at the comments in ../labrpc/labrpc.go for more details.
- //
- // if you're having trouble getting RPC to work, check that you've
- // capitalized all field names in structs passed over RPC, and
- // that the caller passes the address of the reply struct with &, not
- // the struct itself.
- //
- func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
- ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
- return ok
- }
- func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
- ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
- return ok
- }
- //
- // the service using Raft (e.g. a k/v server) wants to start
- // agreement on the next command to be appended to Raft's log. if this
- // server isn't the leader, returns false. otherwise start the
- // agreement and return immediately. there is no guarantee that this
- // command will ever be committed to the Raft log, since the leader
- // may fail or lose an election. even if the Raft instance has been killed,
- // this function should return gracefully.
- //
- // the first return value is the index that the command will appear at
- // if it's ever committed. the second return value is the current
- // term. the third return value is true if this server believes it is
- // the leader.
- //
- func (rf *Raft) Start(command interface{}) (int, int, bool) {
- index := -1
- term := -1
- isLeader := true
- // Your code here (2B).
- return index, term, isLeader
- }
- //
- // the tester calls Kill() when a Raft instance won't
- // be needed again. for your convenience, we supply
- // code to set rf.dead (without needing a lock),
- // and a killed() method to test rf.dead in
- // long-running loops. you can also add your own
- // code to Kill(). you're not required to do anything
- // about this, but it may be convenient (for example)
- // to suppress debug output from a Kill()ed instance.
- //
- func (rf *Raft) Kill() {
- atomic.StoreInt32(&rf.dead, 1)
- // Your code here, if desired.
- }
- func (rf *Raft) killed() bool {
- z := atomic.LoadInt32(&rf.dead)
- return z == 1
- }
- // Returns a random number from 300 ms - 500 ms to be
- // used as an election timeout.
- func (rf *Raft) SetNextElectionTime() {
- rand.Seed(time.Now().UnixNano())
- var sleepDuration time.Duration = time.Duration((MinElectionTimeout + rand.Intn(MaxElectionTimeout-MinElectionTimeout)) * MillisecondToNanoSecond)
- rf.nextElectionTime = time.Now().Add(sleepDuration)
- }
- //
- // the service or tester wants to create a Raft server. the ports
- // of all the Raft servers (including this one) are in peers[]. this
- // server's port is peers[me]. all the servers' peers[] arrays
- // have the same order. persister is a place for this server to
- // save its persistent state, and also initially holds the most
- // recent saved state, if any. applyCh is a channel on which the
- // tester or service expects Raft to send ApplyMsg messages.
- // Make() must return quickly, so it should start goroutines
- // for any long-running work.
- //
- func Make(peers []*labrpc.ClientEnd, me int,
- persister *Persister, applyCh chan ApplyMsg) *Raft {
- rf := &Raft{}
- rf.mu.Lock()
- rf.peers = peers
- rf.persister = persister
- rf.me = me
- rf.serverStatus = Follower
- rf.SetNextElectionTime()
- // Your initialization code here (2A, 2B, 2C).
- // initialize from state persisted before a crash
- rf.readPersist(persister.ReadRaftState())
- rf.mu.Unlock()
- go rf.AdvanceElectionTimeout()
- return rf
- }
- func (rf *Raft) AdvanceElectionTimeout() {
- for {
- rf.mu.Lock()
- if !time.Now().Before(rf.nextElectionTime) {
- // fmt.Printf("Become candidate %v\n", rf.me)
- rf.mu.Unlock()
- selected := rf.BeginElection() // try to become a leader
- if selected {
- // fmt.Printf("Leader %v at term %v", rf.me, rf.currentTerm)
- rf.BecomeLeader()
- } else {
- rf.mu.Lock()
- rf.serverStatus = Follower
- rf.mu.Unlock()
- }
- } else {
- // If election time is ahead, sleep until election time
- if rf.nextElectionTime.After(time.Now()) {
- sleepDuration := rf.nextElectionTime.Sub(time.Now())
- rf.mu.Unlock()
- time.Sleep(sleepDuration)
- } else {
- rf.mu.Unlock()
- }
- }
- }
- }
- func (rf *Raft) BecomeLeader() {
- rf.mu.Lock()
- rf.serverStatus = Leader
- rf.mu.Unlock()
- rf.SendHeartBeats()
- }
- func (rf *Raft) SendHeartBeats() {
- for {
- rf.mu.Lock()
- if rf.serverStatus == Leader {
- currentTerm := rf.currentTerm
- rf.mu.Unlock()
- for i := range rf.peers {
- go func(server int) {
- if server != rf.me {
- args := AppendEntriesArgs{currentTerm, rf.me, 0, 0, []LogEntry{}, 0} // TODO: See how to fix values initialized as 0
- reply := AppendEntriesReply{}
- ok := rf.peers[server].Call("Raft.AppendEntries", &args, &reply)
- if ok {
- rf.mu.Lock()
- if reply.Term > rf.currentTerm {
- rf.currentTerm = reply.Term
- rf.serverStatus = Follower
- rf.votedFor = -1
- }
- rf.mu.Unlock()
- }
- }
- }(i)
- }
- } else {
- rf.mu.Unlock()
- break
- }
- sleepTime := (time.Second / MaxHeartBeatsPerSecond)
- time.Sleep(sleepTime)
- }
- }
- func (rf *Raft) BeginElection() bool {
- var mutex = &sync.Mutex{}
- rf.mu.Lock()
- rf.currentTerm++
- rf.serverStatus = Candidate
- // Vote for itself here
- mutex.Lock()
- yesVotesCount := 1
- recievedVotesCount := 1
- mutex.Unlock()
- rf.votedFor = rf.me
- // Reset election term
- rf.SetNextElectionTime()
- nextElectionTime := rf.nextElectionTime
- currentTerm := rf.currentTerm
- rf.mu.Unlock()
- requestVoteArgs := RequestVoteArgs{currentTerm, rf.me, 0, 0}
- for peer := range rf.peers {
- if peer != rf.me {
- go func(server int) {
- requestVoteReply := RequestVoteReply{}
- ok := rf.sendRequestVote(server, &requestVoteArgs, &requestVoteReply)
- if ok {
- mutex.Lock()
- recievedVotesCount++
- if requestVoteReply.VoteGranted {
- yesVotesCount++
- }
- if requestVoteReply.Term > currentTerm {
- rf.mu.Lock()
- rf.currentTerm = requestVoteReply.Term
- rf.mu.Unlock()
- }
- mutex.Unlock()
- }
- }(peer)
- }
- }
- for {
- if !time.Now().Before(nextElectionTime) {
- // fmt.Printf("Election has timed out in %v\n", rf.me)
- return false
- }
- // If there is a winner or we have recieved all the votes we need
- mutex.Lock()
- if yesVotesCount > (len(rf.peers)/2) || recievedVotesCount == len(rf.peers) {
- if !time.Now().Before(nextElectionTime) {
- // fmt.Printf("Election has timed out in %v\n", rf.me)
- return false
- }
- if yesVotesCount > (len(rf.peers)/2) && rf.nextElectionTime == nextElectionTime && time.Now().Before(nextElectionTime) && rf.currentTerm == currentTerm {
- // fmt.Printf("Yes votes count %v and no of peers %v\n", yesVotesCount, len(rf.peers)/2)
- return true
- }
- if yesVotesCount < len(rf.peers)/2 {
- // fmt.Println("votes not enough")
- return false
- }
- return false
- }
- time.Sleep(time.Millisecond)
- mutex.Unlock()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement