Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include <seastar/include/seastar/core/future.hh>
- #include <seastar/include/seastar/core/lowres_clock.hh>
- #include <seastar/include/seastar/core/reactor.hh>
- #include <seastar/include/seastar/net/socket_defs.hh>
- #include <cstdint>
- #include <compare>
- #include <vector>
- namespace raft {
- struct replica {
- seastar::socket_address id;
- };
- struct group_leader {
- replica node;
- bool current_node() const;
- };
- class term {
- uint32_t _val;
- public:
- term() = default;
- explicit term(uint32_t val) noexcept
- : _val(val) {
- }
- uint32_t value() const {
- return _val;
- }
- friend std::strong_ordering operator<=>(const term&, const term&);
- };
- class log_offset {
- uint64_t _pos;
- public:
- log_offset() = default;
- explicit log_offset(uint32_t pos) noexcept
- : _pos(pos) {
- }
- uint64_t position() const {
- return _pos;
- }
- friend std::strong_ordering operator<=>(const log_offset&, const log_offset&);
- };
- struct log_index {
- term term;
- seastar::shard_id leader_shard;
- log_offset offset;
- };
- class hybrid_clock {
- public:
- using logical_component = uint32_t;
- using physical_component = uint32_t;
- class time_point {
- uint64_t _rep;
- public:
- friend std::weak_equality operator<=>(const time_point&, const time_point&);
- };
- void update(logical_component);
- time_point now();
- };
- class group_id {
- uint64_t _id;
- public:
- explicit group_id(uint64_t id) noexcept
- : _id(id) {
- }
- friend std::strong_equality operator<=>(const group_id&, const group_id&);
- };
- struct group {
- group_id id;
- std::vector<replica> replicas;
- };
- namespace provider {
- // Sends Raft messages between members of a group. The module is not specific to a Raft group.
- class rpc {
- public:
- struct entry {
- fragmented_temporary_buffer data;
- log_index index;
- };
- struct success { };
- struct stale_leader {
- term current_term;
- };
- struct log_index_mismatch {
- log_index current_log_index;
- };
- struct prev_term_indexes {
- std::vector<log_index> indexes;
- };
- struct got_vote_tag { };
- using got_vote = bool_class<got_vote_tag>;
- // Sends the append entries payload, containing the following:
- // struct append_entries_payload {
- // term leader_term;
- // replica leader;
- // uint32_t shard;
- // log_index leader_commit_index;
- // log_index prev_log_index;
- // term prev_log_term;
- // entry entries[];
- // };
- // The follower either successfully processes the RPC, or updates a stale leader with
- // the current term, or sends the last index it has on its log for the current term (if
- // doesn't match the index of the first entry), or returns the log indexes of all leader
- // shards for the last term for which it received entries.
- virtual future<std::variant<success, stale_leader, log_index_mismatch, prev_term_indexes>> append_entries(
- replica destination,
- term leader_term,
- log_index leader_commit_index,
- log_index prev_log_index,
- std::vector<entry>) = 0;
- // Sends the request vote payload, containing the following:
- // struct request_vote_payload {
- // term candidate_term;
- // replica candidate;
- // log_index last_index;
- // };
- // The candidate either receives the vote or not, or it is updated with the current term of the group.
- virtual future<std::variant<got_vote, stale_leader>> request_vote(
- std::vector<replica> peers,
- term candidate_term,
- log_index last_index) = 0;
- };
- // Sends heartbeats on behalf of all groups of this node, and across all shards
- // Should exist only on one shard.
- class heartbeats {
- public:
- // The callback receives the election timeout to use. That timeout can be biased
- // by how many groups the current node is already a leader of.
- using on_leader_timeout = std::function<future<group_leader>(const group&, seastar::lowres_clock::time_point)>;
- protected:
- on_leader_timeout _on_leader_timeout;
- public:
- heartbeats(on_leader_timeout cb)
- : _on_leader_timeout(std::move(cb)) {
- }
- // Registers a new Raft group, for which leader election should be triggered.
- virtual future<group_leader> register_group(const group&) = 0;
- // Stop sending heartbeats for this group, as leadership has been relinquished.
- virtual future<> relinquish_leadership(const group&) = 0;
- // Sets the commit index for a particular group. If the current node is the leader,
- // then that commit index is sent in heartbeat messages.
- virtual future<> set_commit_index(const group&, log_index) = 0;
- // Unregisters a Raft group.
- virtual future<> unregister_group(const group&) = 0;
- };
- // State-machine for a particular Raft group.
- class state_machine {
- // Transfers the contents of the state machine to the specified replica.
- virtual future<> transfer_to(replica) = 0;
- // Applies the specified entry to the state machine.
- virtual future<> apply(fragmented_temporary_buffer) = 0;
- };
- // Stores the persistent Raft state belonging to a Raft group.
- class state {
- // Registers the replica for which the current node voted, for the specified term.
- // Should replicate that information across shards. When the future resolves, the
- // information should be persisted on disk.
- virtual future<> register_vote(term, replica) = 0;
- // Registers the current commit index for the current leader shard. When the future resolves,
- // the information is not guaranteed to have been persisted.
- virtual future<> register_commit_index_relaxed(log_index) = 0;
- // Registers the previous information of the discarded log for the current leader shard.
- // When the future resolves, the information should be persisted on disk.
- virtual future<> register_log_compaction(log_index, std::vector<replica> config) = 0;
- };
- // Implements the persistent Raft log for a particular Raft group.
- class log {
- // Appends the specified entry for the current shard, at the specified term.
- // When the future resolves, the entry should be persisted in stable storage.
- virtual future<log_index> append(
- term,
- size_t,
- std::function<void(fragmented_temporary_buffer::ostream&)>) = 0;
- // The persisted tail of the log for the current shard, used to include in the append_entries RPC.
- virtual log_index tail() const = 0;
- // The tail of the log for all shards of a term, used to bring up a follower to date.
- virtual std::vector<log_index> tail_for_all_shards_of(term) = 0;
- // Discards a prefix of the log, to support log compaction.
- virtual future<> discard_prefix(log_index) = 0;
- // Discards a suffix of the log, when support removing entries from an outdated follower.
- virtual future<> discard_suffix(log_index) = 0;
- // Reads a set of entries in the specified range.
- using process_entry = std::function<future<>(const fragmented_temporary_buffer&)>;
- virtual future<> read(log_index, log_index, process_entry) = 0;
- };
- } // namespace provider
- // The base type of Raft entries. There can be multiple types of entries managed
- // by the same Raft group, each with different requirements.
- class entry {
- using type = unsigned;
- // The type of entry, useful to retrieve associated types (e.g., a deserializer).
- virtual type entry_type() const = 0;
- // Serializes the entry to the specified buffer.
- virtual void write(fragmented_temporary_buffer::ostream&) = 0;
- // Called when the entry has been replicated across the set of replicas of the group.
- // Side-effects should be idempotent, as the callback can be called multiple times (e.g.,
- // when replaying the log).
- virtual future<> on_replicated() = 0;
- // Sets the timestamp
- virtual void set_timestamp(hybrid_clock::time_point) = 0;
- };
- // Deserializes entries of a particular type.
- class entry_deserializer {
- virtual entry read(const fragmented_temporary_buffer&) = 0;
- };
- // Exposes the Raft protocol to external consumers, for a particular group.
- // Consumes the heartbeat, rpc, state machine, state and log providers.
- class protocol {
- public:
- // Specifies whether the current node is the leader of this Raft instance.
- virtual bool is_leader() const = 0;
- // Returns the leader, useful to forward requests to it.
- virtual group_leader leader() const = 0;
- // Replicates the specified entry across the Raft group. Returns when the entry has been committed.
- virtual future<> replicate(entry) = 0;
- // When the group's configuration changes. Calls to this function are ordered w.r.t. calls to replicate().
- virtual future<> update_configuration(std::vector<replica>) = 0;
- };
- } // namespace raft
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement