Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- --------------------------------- MODULE RaftMongo ---------------------------------
- \* This is the formal specification for the Raft consensus algorithm in MongoDB
- EXTENDS Naturals, FiniteSets, Sequences, TLC, Animation, Integers
- \* The set of server IDs
- CONSTANTS Server
- \* Server states.
- \* Candidate is not used, but this is fine.
- CONSTANTS Follower, Candidate, Leader
- \* A reserved value.
- CONSTANTS Nil
- ----
- \* Global variables
- \* The server's term number.
- VARIABLE globalCurrentTerm
- ----
- \* The following variables are all per server (functions with domain Server).
- \* The server's state (Follower, Candidate, or Leader).
- VARIABLE state
- \* The commit point learned by each server.
- VARIABLE commitPoint
- \* The current sync source of each server, if it has one.
- VARIABLE syncSource
- electionVars == <<globalCurrentTerm, state>>
- serverVars == <<electionVars, commitPoint, syncSource>>
- \* A Sequence of log entries. The index into this sequence is the index of the
- \* log entry. Unfortunately, the Sequence module defines Head(s) as the entry
- \* with index 1, so be careful not to use that!
- VARIABLE log
- logVars == <<log>>
- \* End of per server variables.
- ----
- \* All variables; used for stuttering (asserting state hasn't changed).
- vars == <<serverVars, logVars>>
- ----
- \* Helpers
- \* The set of all quorums. This just calculates simple majorities, but the only
- \* important property is that every quorum overlaps with every other.
- Quorum == {i \in SUBSET(Server) : Cardinality(i) * 2 > Cardinality(Server)}
- \* The term of the last entry in a log, or 0 if the log is empty.
- GetTerm(xlog, index) == IF index = 0 THEN 0 ELSE xlog[index].term
- LogTerm(i, index) == GetTerm(log[i], index)
- LastTerm(xlog) == GetTerm(xlog, Len(xlog))
- \* Return the minimum value from a set, or undefined if the set is empty.
- Min(s) == CHOOSE x \in s : \A y \in s : x <= y
- \* Return the maximum value from a set, or undefined if the set is empty.
- Max(s) == CHOOSE x \in s : \A y \in s : x >= y
- ----
- \* Define initial values for all variables
- InitServerVars == /\ globalCurrentTerm = 0
- /\ state = [i \in Server |-> Follower]
- /\ commitPoint = [i \in Server |-> [term |-> 0, index |-> 0]]
- /\ syncSource = [i \in Server |-> Nil]
- InitLogVars == /\ log = [i \in Server |-> << >>]
- Init == /\ InitServerVars
- /\ InitLogVars
- ----
- \* Message handlers
- \* i = recipient, j = sender, m = message
- \* Is log[i] a prefix of log[j].
- IsLogPrefix(i, j) ==
- /\ Len(log[i]) < Len(log[j])
- /\ LastTerm(log[i]) = LogTerm(j, Len(log[i]))
- AppendOplog(i, j) ==
- \* /\ state[i] = Follower \* Disable primary catchup and draining
- /\ syncSource[i] = j \* Optionally enable sync source selection rules.
- /\ Len(log[i]) < Len(log[j])
- /\ LastTerm(log[i]) = LogTerm(j, Len(log[i]))
- /\ log' = [log EXCEPT ![i] = Append(log[i], log[j][Len(log[i]) + 1])]
- /\ UNCHANGED <<serverVars>>
- CanRollbackOplog(i, j) ==
- /\ Len(log[i]) > 0
- /\ \* The log with later term is more up-to-date
- LastTerm(log[i]) < LastTerm(log[j])
- /\
- \/ Len(log[i]) > Len(log[j])
- \* There seems no short-cut of OR clauses, so I have to specify the negative case
- \/ /\ Len(log[i]) <= Len(log[j])
- /\ LastTerm(log[i]) /= LogTerm(j, Len(log[i]))
- RollbackOplog(i, j) ==
- /\ CanRollbackOplog(i, j)
- \* Rollback 1 oplog entry
- /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> log[i][index2]]
- IN log' = [log EXCEPT ![i] = new]
- \* For any nodes that are currently syncing from you, clear their sync sources.
- /\ syncSource' = [s \in Server |-> IF syncSource[s] = i THEN Nil ELSE syncSource[s]]
- /\ UNCHANGED <<electionVars, commitPoint>>
- \* The set of nodes that has log[me][logIndex] in their oplog
- Agree(me, logIndex) ==
- { node \in Server :
- /\ Len(log[node]) >= logIndex
- /\ LogTerm(me, logIndex) = LogTerm(node, logIndex) }
- IsCommitted(me, logIndex) ==
- /\ Agree(me, logIndex) \in Quorum
- \* If we comment out the following line, a replicated log entry from old primary will voilate the safety.
- \* [ P (2), S (), S ()]
- \* [ S (2), S (), P (3)]
- \* [ S (2), S (2), P (3)] !!! the log from term 2 shouldn't be considered as committed.
- /\ LogTerm(me, logIndex) = globalCurrentTerm
- \* RollbackCommitted and NeverRollbackCommitted are not actions.
- \* They are used for verification.
- RollbackCommitted(i) ==
- \E j \in Server:
- /\ CanRollbackOplog(i, j)
- /\ IsCommitted(i, Len(log[i]))
- NeverRollbackCommitted ==
- \A i \in Server: ~RollbackCommitted(i)
- \* ACTION
- \* i = the new primary node.
- BecomePrimaryByMagic(i) ==
- LET notBehind(me, j) ==
- \/ LastTerm(log[me]) > LastTerm(log[j])
- \/ /\ LastTerm(log[me]) = LastTerm(log[j])
- /\ Len(log[me]) >= Len(log[j])
- ayeVoters(me) ==
- { index \in Server : notBehind(me, index) }
- IN /\ ayeVoters(i) \in Quorum
- /\ state' = [index \in Server |-> IF index = i THEN Leader ELSE Follower]
- /\ globalCurrentTerm' = globalCurrentTerm + 1
- /\ syncSource' = [syncSource EXCEPT ![i] = Nil] \* clear sync source.
- /\ UNCHANGED <<commitPoint, logVars>>
- \* ACTION
- \* Leader i receives a client request to add v to the log.
- ClientWrite(i) ==
- /\ state[i] = Leader
- /\ LET entry == [term |-> globalCurrentTerm]
- newLog == Append(log[i], entry)
- IN log' = [log EXCEPT ![i] = newLog]
- /\ UNCHANGED <<serverVars>>
- \* ACTION
- AdvanceCommitPoint ==
- \E leader \in Server :
- /\ state[leader] = Leader
- /\ IsCommitted(leader, Len(log[leader]))
- /\ commitPoint' = [commitPoint EXCEPT ![leader] = [term |-> LastTerm(log[leader]), index |-> Len(log[leader])]]
- /\ UNCHANGED <<electionVars, logVars, syncSource>>
- \* Return whether Node i can learn the commit point from Node j.
- CommitPointLessThan(i, j) ==
- \/ commitPoint[i].term < commitPoint[j].term
- \/ /\ commitPoint[i].term = commitPoint[j].term
- /\ commitPoint[i].index < commitPoint[j].index
- \* ACTION
- \* Node i learns the commit point from j via heartbeat.
- LearnCommitPoint(i, j) ==
- /\ CommitPointLessThan(i, j)
- /\ commitPoint' = [commitPoint EXCEPT ![i] = commitPoint[j]]
- /\ UNCHANGED <<electionVars, logVars, syncSource>>
- \* ACTION
- \* Node i learns the commit point from j via heartbeat with term check
- LearnCommitPointWithTermCheck(i, j) ==
- /\ LastTerm(log[i]) = commitPoint[j].term
- /\ LearnCommitPoint(i, j)
- \* ACTION
- LearnCommitPointFromSyncSource(i, j) ==
- /\ ENABLED AppendOplog(i, j)
- /\ LearnCommitPoint(i, j)
- \* ACTION
- LearnCommitPointFromSyncSourceNeverBeyondLastApplied(i, j) ==
- \* From sync source
- /\ ENABLED AppendOplog(i, j)
- /\ CommitPointLessThan(i, j)
- \* Never beyond last applied
- /\ LET myCommitPoint ==
- \* If they have the same term, commit point can be ahead.
- IF commitPoint[j].term <= LastTerm(log[i])
- THEN commitPoint[j]
- ELSE [term |-> LastTerm(log[i]), index |-> Len(log[i])]
- IN commitPoint' = [commitPoint EXCEPT ![i] = myCommitPoint]
- /\ UNCHANGED <<electionVars, logVars>>
- \* ACTION
- AppendEntryAndLearnCommitPointFromSyncSource(i, j) ==
- \* Append entry
- /\ syncSource[i] = j \* Optionally enable sync source selection rules.
- /\ Len(log[i]) < Len(log[j])
- /\ LastTerm(log[i]) = LogTerm(j, Len(log[i]))
- /\ log' = [log EXCEPT ![i] = Append(log[i], log[j][Len(log[i]) + 1])]
- \* Learn commit point
- /\ CommitPointLessThan(i, j)
- /\ commitPoint' = [commitPoint EXCEPT ![i] = commitPoint[j]]
- /\ UNCHANGED <<electionVars, syncSource>>
- \* ACTION
- \* Server i chooses server j as its new sync source.
- \* i can choose j as a sync source if log[i] is a prefix of log[j] and
- \* log[j] is longer than log[i].
- ChooseNewSyncSource(i, j) ==
- /\ \/ IsLogPrefix(i, j)
- \* If logs are equal, allow choosing sync source if it has a newer commit point.
- \/ /\ log[i] = log[j]
- /\ commitPoint[j].index > commitPoint[i].index
- /\ state[i] = Follower \* leaders don't need to sync oplog entries.
- /\ syncSource' = [syncSource EXCEPT ![i] = j]
- /\ UNCHANGED <<electionVars, logVars, commitPoint>>
- \* Does a 2 node sync source cycle exist?
- SyncSourceCycleTwoNode ==
- \E s, t \in Server :
- /\ s # t
- /\ syncSource[s] = t
- /\ syncSource[t] = s
- \* The set of all sequences with elements in set 's', of maximum length 'n'.
- BoundedSeq(s, n) == [1..n -> s]
- \* The set of all paths in the graph that consists of edges <<s,t>> where s has t as a sync
- \* source.
- SyncSourcePaths ==
- {p \in BoundedSeq(Server, Cardinality(Server)) :
- \A i \in 1..(Len(p)-1) : syncSource[p[i]] = p[i+1]}
- \* Is there a non-trivial path in the sync source graph from node i to node j?
- \* This rules out trivial paths i.e. those of length 1.
- SyncSourcePath(i, j) ==
- \E p \in SyncSourcePaths :
- /\ Len(p) > 1
- /\ p[1] = i \* the source node.
- /\ p[Len(p)]=j \* the target node.
- \* Does a general (possibly multi-node) sync source cycle exist?
- SyncSourceCycle ==
- \E s \in Server : SyncSourcePath(s, s)
- ----
- AppendOplogAction ==
- \E i,j \in Server : AppendOplog(i, j)
- RollbackOplogAction ==
- \E i,j \in Server : RollbackOplog(i, j)
- BecomePrimaryByMagicAction ==
- \E i \in Server : BecomePrimaryByMagic(i)
- ClientWriteAction ==
- \E i \in Server : ClientWrite(i)
- LearnCommitPointAction ==
- \E i, j \in Server : LearnCommitPoint(i, j)
- LearnCommitPointWithTermCheckAction ==
- \E i, j \in Server : LearnCommitPointWithTermCheck(i, j)
- LearnCommitPointFromSyncSourceAction ==
- \E i, j \in Server : LearnCommitPointFromSyncSource(i, j)
- LearnCommitPointFromSyncSourceNeverBeyondLastAppliedAction ==
- \E i, j \in Server : LearnCommitPointFromSyncSourceNeverBeyondLastApplied(i, j)
- AppendEntryAndLearnCommitPointFromSyncSourceAction ==
- \E i, j \in Server : AppendEntryAndLearnCommitPointFromSyncSource(i, j)
- ChooseNewSyncSourceAction ==
- \E i, j \in Server : ChooseNewSyncSource(i, j)
- ----
- \* Properties to check
- RollbackBeforeCommitPoint(i) ==
- /\ \E j \in Server:
- /\ CanRollbackOplog(i, j)
- /\ \/ LastTerm(log[i]) < commitPoint[i].term
- \/ /\ LastTerm(log[i]) = commitPoint[i].term
- /\ Len(log[i]) <= commitPoint[i].index
- \* todo: clean up
- NeverRollbackBeforeCommitPoint == \A i \in Server: ~RollbackBeforeCommitPoint(i)
- \* Liveness check
- \* This isn't accurate for any infinite behavior specified by Spec, but it's fine
- \* for any finite behavior with the liveness we can check with the model checker.
- \* This is to check at any time, if two nodes' commit points are not the same, they
- \* will be the same eventually.
- \* This is checked after all possible rollback is done.
- CommitPointEventuallyPropagates ==
- /\ \A i, j \in Server:
- [](commitPoint[i] # commitPoint[j] ~>
- <>(~ENABLED RollbackOplogAction => commitPoint[i] = commitPoint[j]))
- ----
- \* Defines how the variables may transition.
- Next ==
- \* --- Replication protocol
- \/ AppendOplogAction
- \/ RollbackOplogAction
- \/ BecomePrimaryByMagicAction
- \/ ClientWriteAction
- \/ ChooseNewSyncSourceAction
- \*
- \* --- Commit point learning protocol
- \/ AdvanceCommitPoint
- \* \/ LearnCommitPointAction
- \/ LearnCommitPointFromSyncSourceAction
- \* \/ AppendEntryAndLearnCommitPointFromSyncSourceAction
- \* \/ LearnCommitPointWithTermCheckAction
- \* \/ LearnCommitPointFromSyncSourceNeverBeyondLastAppliedAction
- Liveness ==
- /\ SF_vars(AppendOplogAction)
- /\ SF_vars(RollbackOplogAction)
- \* A new primary should eventually write one entry.
- /\ WF_vars(\E i \in Server : LastTerm(log[i]) # globalCurrentTerm /\ ClientWrite(i))
- \* /\ WF_vars(ClientWriteAction)
- \*
- \* --- Commit point learning protocol
- /\ WF_vars(AdvanceCommitPoint)
- \* /\ WF_vars(LearnCommitPointAction)
- /\ SF_vars(LearnCommitPointFromSyncSourceAction)
- \* /\ SF_vars(AppendEntryAndLearnCommitPointFromSyncSourceAction)
- \* /\ SF_vars(LearnCommitPointWithTermCheckAction)
- \* /\ SF_vars(LearnCommitPointFromSyncSourceNeverBeyondLastAppliedAction)
- \* The specification must start with the initial state and transition according
- \* to Next.
- Spec == Init /\ [][Next]_vars /\ Liveness
- \*
- \*
- \* TRACE ANIMATION.
- \*
- \*
- Pick(S) == CHOOSE x \in S : TRUE
- RECURSIVE SetToSeq(_)
- SetToSeq(S) == IF S = {} THEN <<>>
- ELSE LET v == Pick(S) IN <<v>> \o SetToSeq(S \ {v})
- Injective(f) == \A x, y \in DOMAIN f : f[x] = f[y] => x = y
- ServerId == CHOOSE f \in [Server -> 1..Cardinality(Server)] : Injective(f)
- \* This animation assumes 4 servers.
- Base == [x |-> 50, y |-> 50]
- ServerPositions ==
- 1 :> [x |-> Base.x, y |-> Base.y] @@
- 2 :> [x |-> Base.x + 200, y |-> Base.y + 0] @@
- 3 :> [x |-> Base.x + 200, y |-> Base.y + 150] @@
- 4 :> [x |-> Base.x + 0, y |-> Base.y + 150]
- \* Colors to represent the state of a server.
- StateColor ==
- Leader :> "green" @@
- Candidate :> "yellow" @@
- Follower :> "darkgray"
- \* Generates a graphic element representing server 'sid'.
- ServerElem(sid) ==
- LET pos == ServerPositions[ServerId[sid]]
- \* circleAttrs == ("stroke" :> "black" @@ "stroke-width" :> "1" @@ "fill" :> "blue")
- circleAttrs == [fill |-> StateColor[state[sid]]]
- circle == Circle(pos.x, pos.y, 12, circleAttrs) IN
- Group(<<circle>>, <<>>)
- ServerElems == {Group(<<ServerElem(s)>>, <<>>) : s \in Server}
- \* Linearly interpolate a position halfway between two given points. 't', the interpolation factor, is measured
- \* on a scale from 0 to 100.
- Interp(p1, p2, t) ==
- [ x |-> (t * p1.x + (100-t) * p2.x) \div 100 ,
- y |-> (t * p1.y + (100-t) * p2.y) \div 100]
- \* The position of a log slot for a server and log index.
- LogEntryDims == [w |-> 22, h |-> 22]
- LogPos(s, i) == [x |-> ServerPositions[ServerId[s]].x + 40,
- y |-> ServerPositions[ServerId[s]].y + 20 - (i * (LogEntryDims.w + 3))]
- \* Generate a single log entry element at index 'i' for server 'sid'.
- LogEntryElem(sid, i) ==
- LET pos == LogPos(sid, i)
- entry == Rect(pos.x, pos.y, LogEntryDims.w, LogEntryDims.h, [fill |-> "lightblue"])
- term == Text((pos.x + LogEntryDims.w \div 2 - 6), (pos.y + LogEntryDims.h \div 2 + 8), ToString(log[sid][i].term), <<>>) IN
- Group(<<entry, term>>, <<>>)
- LogElem(sid) ==
- LET entryElems == {LogEntryElem(sid, i) : i \in DOMAIN log[sid]} IN
- Group(SetToSeq(entryElems), <<>>)
- LogElems == {LogElem(s) : s \in Server}
- \* Log Slot Elements
- MaxLogSlots == 2
- LogSlot(sid, i) == Rect(LogPos(sid, i).x, LogPos(sid, i).y, LogEntryDims.w + 2, LogEntryDims.h, [stroke |-> (IF commitPoint[sid].index = i THEN "red" ELSE "black"), fill |-> "lightgray"])
- LogSlotElem(sid) ==
- LET slotElems == {LogSlot(sid, i) : i \in 1..MaxLogSlots} IN
- Group(SetToSeq(slotElems), <<>>)
- LogSlotElems == {LogSlotElem(s) : s \in Server}
- SyncSource(s) ==
- LET src == ServerPositions[ServerId[s]]
- target == ServerPositions[ServerId[syncSource[s]]]
- line == Line(src.x, src.y, target.x, target.y, [stroke |-> "black"])
- headP == Interp(src, target, 5)
- head == Circle(headP.x, headP.y, 3, [fill |-> "orange"]) IN
- Group(<<line, head>>, <<>>)
- SyncSourceElems == {SyncSource(i) : i \in {s \in Server : syncSource[s] # Nil}}
- \*AllElems == SetToSeq(LogElems) \o SetToSeq(ServerElems)
- AllElems == SetToSeq(ServerElems)
- \o SetToSeq(SyncSourceElems)
- \o SetToSeq(LogSlotElems)
- \o SetToSeq(LogElems)
- View == Group(AllElems, <<>>)
- ===============================================================================
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement