Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package raft
- //
- // this is an outline of the API that raft must expose to
- // the service (or tester). see comments below for
- // each of these functions for more details.
- //
- // rf = Make(...)
- // create a new Raft server.
- // rf.Start(command interface{}) (index, term, isleader)
- // start agreement on a new log entry
- // rf.GetState() (term, isLeader)
- // ask a Raft for its current term, and whether it thinks it is leader
- // ApplyMsg
- // each time a new entry is committed to the log, each Raft peer
- // should send an ApplyMsg to the service (or tester)
- // in the same server.
- //
- import "sync"
- import "sync/atomic"
- import "../labrpc"
- import "fmt"
- // import "bytes"
- // import "../labgob"
- import "time"
- import "math/rand"
- //
- // as each Raft peer becomes aware that successive log entries are
- // committed, the peer should send an ApplyMsg to the service (or
- // tester) on the same server, via the applyCh passed to Make(). set
- // CommandValid to true to indicate that the ApplyMsg contains a newly
- // committed log entry.
- //
- // in Lab 3 you'll want to send other kinds of messages (e.g.,
- // snapshots) on the applyCh; at that point you can add fields to
- // ApplyMsg, but set CommandValid to false for these other uses.
- //
- type ApplyMsg struct {
- CommandValid bool
- Command interface{}
- CommandIndex int
- }
- type LogEntry struct {
- Term int
- Command interface{}
- }
- type serverState struct {
- Leader bool
- Candidate bool
- Follower bool
- }
- //
- // A Go object implementing a single Raft peer.
- //
- type Raft struct {
- mu sync.Mutex // Lock to protect shared access to this peer's state
- peers []*labrpc.ClientEnd // RPC end points of all peers
- persister *Persister // Object to hold this peer's persisted state
- me int // this peer's index into peers[]
- dead int32 // set by Kill()
- currentTerm int
- votedFor int
- log []LogEntry
- serverState serverState
- commitIndex int
- lastApplied int
- lastHeardFromLeader time.Time
- votes []bool
- numHeardFrom int
- voteMutex sync.Mutex
- voteCond sync.Cond
- leaderSet bool
- // Your data here (2A, 2B, 2C).
- // Look at the paper's Figure 2 for a description of what
- // state a Raft server must maintain.
- }
- // return currentTerm and whether this server
- // believes it is the leader.
- func (rf *Raft) GetState() (int, bool) {
- rf.mu.Lock()
- defer rf.mu.Unlock()
- isLeader := rf.serverState.Leader
- return rf.currentTerm, isLeader
- }
- //
- // save Raft's persistent state to stable storage,
- // where it can later be retrieved after a crash and restart.
- // see paper's Figure 2 for a description of what should be persistent.
- //
- func (rf *Raft) persist() {
- // Your code here (2C).
- // Example:
- // w := new(bytes.Buffer)
- // e := labgob.NewEncoder(w)
- // e.Encode(rf.xxx)
- // e.Encode(rf.yyy)
- // data := w.Bytes()
- // rf.persister.SaveRaftState(data)
- }
- //
- // restore previously persisted state.
- //
- func (rf *Raft) readPersist(data []byte) {
- if data == nil || len(data) < 1 { // bootstrap without any state?
- return
- }
- // Your code here (2C).
- // Example:
- // r := bytes.NewBuffer(data)
- // d := labgob.NewDecoder(r)
- // var xxx
- // var yyy
- // if d.Decode(&xxx) != nil ||
- // d.Decode(&yyy) != nil {
- // error...
- // } else {
- // rf.xxx = xxx
- // rf.yyy = yyy
- // }
- }
- //
- // example RequestVote RPC arguments structure.
- // field names must start with capital letters!
- //
- type RequestVoteArgs struct {
- // Your data here (2A, 2B).
- Term int
- CandidateID int
- LastLogIndex int
- LastLogTerm int
- }
- //
- // example RequestVote RPC reply structure.
- // field names must start with capital letters!
- //
- type RequestVoteReply struct {
- // Your data here (2A).
- VoteGranted bool
- Term int
- }
- //
- // example RequestVote RPC handler.
- //
- // 1. Reply false if term < currentTerm (§5.1)
- // 2. If votedFor is null or candidateId, and candidate’s log is at
- // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
- func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
- // Your code here (2A, 2B).
- rf.mu.Lock()
- defer rf.mu.Unlock()
- fmt.Printf("[%d] received vote request from %d. request term: %d curr term: %d\n", rf.me, args.CandidateID, args.Term, rf.currentTerm)
- if args.Term > rf.currentTerm {
- fmt.Printf("[%d] updating term %d ---> %d\n", rf.me, rf.currentTerm, args.Term)
- rf.currentTerm = args.Term
- rf.serverState.Follower = true
- rf.serverState.Leader = false
- rf.serverState.Candidate = false
- rf.votedFor = -1
- }
- reply.Term = rf.currentTerm
- if args.Term < rf.currentTerm {
- reply.VoteGranted = false
- } else if rf.votedFor == -1 {
- rf.votedFor = args.CandidateID
- reply.VoteGranted = true
- } else if rf.votedFor == args.CandidateID {
- panic("screm")
- } else {
- reply.VoteGranted = false
- }
- fmt.Printf("[%d] sent vote response to %d. term: %d voteGranted: %t\n", rf.me, args.CandidateID, reply.Term, reply.VoteGranted)
- }
- //
- // example code to send a RequestVote RPC to a server.
- // server is the index of the target server in rf.peers[].
- // expects RPC arguments in args.
- // fills in *reply with RPC reply, so caller should
- // pass &reply.
- // the types of the args and reply passed to Call() must be
- // the same as the types of the arguments declared in the
- // handler function (including whether they are pointers).
- //
- // The labrpc package simulates a lossy network, in which servers
- // may be unreachable, and in which requests and replies may be lost.
- // Call() sends a request and waits for a reply. If a reply arrives
- // within a timeout interval, Call() returns true; otherwise
- // Call() returns false. Thus Call() may not return for a while.
- // A false return can be caused by a dead server, a live server that
- // can't be reached, a lost request, or a lost reply.
- //
- // Call() is guaranteed to return (perhaps after a delay) *except* if the
- // handler function on the server side does not return. Thus there
- // is no need to implement your own timeouts around Call().
- //
- // look at the comments in ../labrpc/labrpc.go for more details.
- //
- // if you're having trouble getting RPC to work, check that you've
- // capitalized all field names in structs passed over RPC, and
- // that the caller passes the address of the reply struct with &, not
- // the struct itself.
- //
- func (rf *Raft) startAgreement(command interface{}) {
- }
- //
- // the service using Raft (e.g. a k/v server) wants to start
- // agreement on the next command to be appended to Raft's log. if this
- // server isn't the leader, returns false. otherwise start the
- // agreement and return immediately. there is no guarantee that this
- // command will ever be committed to the Raft log, since the leader
- // may fail or lose an election. even if the Raft instance has been killed,
- // this function should return gracefully.
- //
- // the first return value is the index that the command will appear at
- // if it's ever committed. the second return value is the current
- // term. the third return value is true if this server believes it is
- // the leader.
- //
- func (rf *Raft) Start(command interface{}) (int, int, bool) {
- rf.mu.Lock()
- defer rf.mu.Unlock()
- index := len(rf.log)
- term := rf.currentTerm
- if rf.serverState.Leader {
- go rf.startAgreement(command)
- }
- return index, term, rf.serverState.Leader
- }
- //
- // the tester calls Kill() when a Raft instance won't
- // be needed again. for your convenience, we supply
- // code to set rf.dead (without needing a lock),
- // and a killed() method to test rf.dead in
- // long-running loops. you can also add your own
- // code to Kill(). you're not required to do anything
- // about this, but it may be convenient (for example)
- // to suppress debug output from a Kill()ed instance.
- //
- func (rf *Raft) Kill() {
- atomic.StoreInt32(&rf.dead, 1)
- // Your code here, if desired.
- }
- func (rf *Raft) killed() bool {
- z := atomic.LoadInt32(&rf.dead)
- return z == 1
- }
- //
- // the service or tester wants to create a Raft server. the ports
- // of all the Raft servers (including this one) are in peers[]. this
- // server's port is peers[me]. all the servers' peers[] arrays
- // have the same order. persister is a place for this server to
- // save its persistent state, and also initially holds the most
- // recent saved state, if any. applyCh is a channel on which the
- // tester or service expects Raft to send ApplyMsg messages.
- // Make() must return quickly, so it should start goroutines
- // for any long-running work.
- //
- // func (rf *Raft) monitorElection() {
- // fmt.Printf("monitoring election %d \n", rf.me)
- // for {
- // // fmt.Printf("spinning %d \n", rf.me)
- // time.Sleep(10 * time.Millisecond)
- // if rf.serverState.Candidate && rf.numHeardFrom > int((0.5 * float64(len(rf.peers)))) {
- // voteCount := 0
- // for i := 0; i < len(rf.votes); i++ {
- // if rf.votes[i] {
- // voteCount += 1
- // }
- // }
- // fmt.Printf("peer: %d, votes: %d/%d\n", rf.me, voteCount, len(rf.peers))
- // if voteCount > int((0.5 * float64(len(rf.peers)))) {
- // rf.mu.Lock()
- // fmt.Printf("%d became leader\n", rf.me)
- // rf.serverState.Leader = true
- // rf.serverState.Candidate = false
- // rf.serverState.Follower = false
- // rf.votes = make([]bool, len(rf.peers))
- // rf.numHeardFrom = 0
- // rf.mu.Unlock()
- // go rf.heartBeats()
- // return
- // }
- // } else if rf.serverState.Leader || rf.serverState.Follower {
- // fmt.Printf("canceling election %d\n", rf.me)
- // rf.mu.Lock()
- // rf.votes = make([]bool, len(rf.peers))
- // rf.numHeardFrom = 0
- // rf.mu.Unlock()
- // return
- // }
- // }
- // }
- // func (rf *Raft) startElection() {
- // rf.mu.Lock()
- // rf.currentTerm += 1
- // rf.serverState.Candidate = true
- // rf.serverState.Follower = false
- // rf.votes[rf.me] = true
- // args := RequestVoteArgs{}
- // args.Term = rf.currentTerm
- // args.CandidateID = rf.me
- // // args.LastLogIndex = rf.commitIndex
- // // args.LastLogTerm = rf.log[rf.commitIndex].Term
- // // args.LastLogTerm = rf.currentTerm // TODO FIX THIS !!!! ITS WRONG
- // rf.mu.Unlock()
- // rf.mu.Lock()
- // rf.mu.Unlock()
- // replies := make([]RequestVoteReply, len(rf.peers))
- // for i := 0; i < len(rf.peers); i++ {
- // fmt.Printf("sent vote request to %d\n", i)
- // go rf.sendRequestVote(i, &args, &replies[i]) // TODO make sure pointer right
- // }
- // fmt.Println("sent vote requests")
- // go rf.monitorElection()
- // var wg sync.WaitGroup
- // for i := 0; i < 5; i++ {
- // wg.Add(1)
- // go func(x int) {
- // sendRPC(x)
- // wg.Done()
- // }(i)
- // }
- // wg.Wait()
- // count := 0
- // finished := 0
- // var mu sync.Mutex
- // cond := sync.NewCond(&mu)
- // for i := 0; i < 10; i++ {
- // go func() {
- // vote := sendRequestVote()
- // mu.Lock()
- // defer mu.Unlock()
- // if vote {
- // count++
- // }
- // finished++
- // cond.Broadcast()
- // }()
- // }
- // mu.Lock()
- // for count < 5 && finished != 10 {
- // cond.Wait()
- // }
- // if count >= 5 {
- // println("received 5+ votes!")
- // } else {
- // println("lost")
- // }
- // mu.Unlock()
- // }
- type AppendEntriesRPCArgs struct {
- LeaderID int
- Term int
- }
- type AppendEntriesRPCReply struct {
- Term int
- }
- func (rf *Raft) AppendEntries(args *AppendEntriesRPCArgs, reply *AppendEntriesRPCReply) {
- rf.mu.Lock()
- defer rf.mu.Unlock()
- if args.Term >= rf.currentTerm {
- fmt.Printf("[%d] received heartbeat from correct/new term from leader %d. %d's term: %d leader's term: %d \n", rf.me, args.LeaderID, rf.me, rf.currentTerm, args.Term)
- rf.lastHeardFromLeader = time.Now()
- rf.currentTerm = args.Term
- rf.serverState.Candidate = false
- rf.serverState.Leader = false
- rf.serverState.Follower = true
- rf.votedFor = -1
- }
- reply.Term = rf.currentTerm
- fmt.Printf("[%d] received heartbeat from leader %d. %d's term: %d leader's term: %d \n", rf.me, args.LeaderID, rf.me, rf.currentTerm, args.Term)
- }
- func (rf *Raft) heartBeats() {
- for {
- if rf.killed() {
- continue
- }
- time.Sleep(120 * time.Millisecond)
- rf.mu.Lock()
- // fmt.Printf("[%d] acquired lock --- heartbeats\n", rf.me)
- curTerm := rf.currentTerm
- if rf.serverState.Leader {
- args := AppendEntriesRPCArgs{}
- args.LeaderID = rf.me
- args.Term = curTerm
- for i := 0; i < len(rf.peers); i++ {
- if i == rf.me {
- continue
- }
- fmt.Printf("[%d] send heartbeats to %d, term: %d\n", rf.me, i, curTerm)
- go func(x int) {
- reply := AppendEntriesRPCReply{}
- res := rf.peers[x].Call("Raft.AppendEntries", &args, &reply)
- if ! res{
- return
- }
- rf.mu.Lock()
- if reply.Term > rf.currentTerm {
- fmt.Printf("[%d] got heartbeat reply from %d, resetting term to %d & becoming a follower\n", rf.me, i, rf.currentTerm)
- rf.currentTerm = reply.Term
- rf.serverState.Follower = true
- rf.serverState.Candidate = false
- rf.serverState.Leader = false
- rf.votedFor = -1
- }
- rf.mu.Unlock()
- }(i)
- }
- }
- rf.mu.Unlock()
- // fmt.Printf("[%d] released lock --- heartbeats\n", rf.me)
- }
- }
- func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
- fmt.Printf("[%d] sending vote request to %d\n", rf.me, server)
- res := rf.peers[server].Call("Raft.RequestVote", args, reply)
- if !res {
- return false
- }
- fmt.Printf("[%d] received vote response from %d\n", rf.me, server)
- rf.mu.Lock()
- fmt.Printf("[%d] acquired lock --- sendRequestVote\n", rf.me)
- if rf.currentTerm == reply.Term && reply.VoteGranted {
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- sendRequestVote\n", rf.me)
- return true
- } else if !reply.VoteGranted && rf.currentTerm == reply.Term {
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- sendRequestVote\n", rf.me)
- return false
- } else if reply.Term > rf.currentTerm { // cancel election if the term has increased
- rf.currentTerm = reply.Term
- rf.serverState.Candidate = false
- rf.serverState.Leader = false
- rf.serverState.Follower = true
- rf.votedFor = -1
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- sendRequestVote\n", rf.me)
- return false
- } else if reply.Term < args.Term {
- // ignore vote
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- sendRequestVote\n", rf.me)
- panic("didnt they get my email?")
- return false
- } else {
- fmt.Printf("args: %+v\n", args)
- fmt.Printf("rf: %+v\n", rf)
- panic("weird thing happened in vote aggregation, fix")
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- sendRequestVote\n", rf.me)
- return false
- }
- }
- func (rf *Raft) startElection() {
- rf.mu.Lock()
- fmt.Printf("[%d] acquired lock --- startElection\n", rf.me)
- rf.currentTerm += 1
- fmt.Printf("[%d] starting election! new term: %d \n", rf.me, rf.currentTerm)
- ourTerm := rf.currentTerm
- rf.serverState.Candidate = true
- rf.serverState.Follower = false
- rf.serverState.Leader = false
- rf.votedFor = rf.me
- args := RequestVoteArgs{}
- args.Term = rf.currentTerm
- args.CandidateID = rf.me
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- startElection\n", rf.me)
- replies := make([]RequestVoteReply, len(rf.peers))
- count := 1
- finished := 0
- cond := sync.NewCond(&rf.voteMutex)
- fmt.Printf("[%d] about to send out votes\n", rf.me)
- for i := 0; i < len(rf.peers); i++ {
- if i == rf.me {
- continue
- }
- go func(x int) {
- vote := rf.sendRequestVote(x, &args, &replies[x])
- rf.voteMutex.Lock()
- if vote {
- count++
- }
- finished++
- cond.Broadcast()
- rf.voteMutex.Unlock()
- }(i)
- }
- fmt.Printf("[%d] waiting for vote results pt1\n", rf.me)
- rf.voteMutex.Lock()
- fmt.Printf("[%d] waiting for vote results UNLOCKED VOTEMUTEX\n", rf.me)
- for count <= (len(rf.peers) / 2) && finished != len(rf.peers) && rf.serverState.Candidate {
- cond.Wait()
- }
- rf.voteMutex.Unlock()
- fmt.Printf("[%d] waiting for vote results pt2\n", rf.me)
- if count > (len(rf.peers) / 2) && rf.serverState.Candidate{
- rf.mu.Lock()
- fmt.Printf("[%d] acquired lock --- election pt 2\n", rf.me)
- fmt.Printf("[%d] became leader in term %d \n", rf.me, rf.currentTerm)
- if rf.currentTerm != ourTerm {
- return
- }
- rf.serverState.Leader = true
- rf.serverState.Candidate = false
- rf.serverState.Follower = false
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- election part 2 \n", rf.me)
- // go rf.heartBeats()
- return
- } else {
- rf.mu.Lock()
- fmt.Printf("[%d] acquired lock --- election pt 2\n", rf.me)
- rf.serverState.Leader = false
- rf.serverState.Candidate = false
- rf.serverState.Follower = true
- rf.votedFor = -1
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- election part 2 \n", rf.me)
- }
- fmt.Printf("[%d] failed election\n", rf.me)
- }
- func (rf *Raft) electionTimeoutHandler() {
- for {
- // see if we've heard from leader within the past X ms.
- // if we haven't, start new election.
- randInterval := rand.Int31n(500 - 300) + 300
- time.Sleep(time.Duration(rand.Int31n(randInterval)) * time.Millisecond)
- if rf.killed() {
- continue
- }
- rf.mu.Lock()
- if rf.serverState.Leader { // don't start elections if you're the leader
- // if rf.serverState.Leader {
- // fmt.Printf("[%d] currently leader, not starting election\n", rf.me)
- // } else {
- // fmt.Printf("[%d] currently candidate, not starting election\n", rf.me)
- // }
- rf.mu.Unlock()
- continue
- } else if rf.serverState.Follower || rf.serverState.Candidate {
- dur := time.Since(rf.lastHeardFromLeader)
- fmt.Printf("[%d] time since last heard from leader: %s", rf.me, dur.String())
- if dur > 1000 * time.Millisecond {
- // start election, send out
- fmt.Printf("[%d] starting election!\n", rf.me)
- rf.lastHeardFromLeader = time.Now()
- go rf.startElection()
- }
- rf.mu.Unlock()
- fmt.Printf("[%d] released lock --- timeouthandler\n", rf.me)
- }
- // else {
- // panic("unclear what the server state is?")
- // }
- }
- }
- // Modify Make() to create a background goroutine that will kick off
- // leader election periodically by sending out RequestVote RPCs when it
- // hasn't heard from another peer for a while. This way a peer will learn
- // who is the leader, if there is already a leader, or become the leader itself.
- func Make(peers []*labrpc.ClientEnd, me int,
- persister *Persister, applyCh chan ApplyMsg) *Raft {
- rf := &Raft{}
- rf.peers = peers
- rf.persister = persister
- rf.me = me
- rf.serverState.Follower = true
- rf.votedFor = -1
- rf.serverState.Leader = false
- rf.serverState.Candidate = false
- rf.log = make([]LogEntry, 0)
- rf.currentTerm = 0
- // Your initialization code here (2A, 2B, 2C).
- fmt.Printf("[%d] creating server \n", me)
- go rf.heartBeats()
- go rf.electionTimeoutHandler()
- // initialize from state persisted before a crash
- rf.readPersist(persister.ReadRaftState())
- return rf
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement