Advertisement
Guest User

Untitled

a guest
Jul 21st, 2019
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.93 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include <seastar/include/seastar/core/future.hh>
  4. #include <seastar/include/seastar/core/lowres_clock.hh>
  5. #include <seastar/include/seastar/core/reactor.hh>
  6. #include <seastar/include/seastar/net/socket_defs.hh>
  7.  
  8. #include <cstdint>
  9. #include <compare>
  10. #include <vector>
  11.  
  12. namespace raft {
  13.  
  14. struct replica {
  15. seastar::socket_address id;
  16. };
  17.  
  18. struct group_leader {
  19. replica node;
  20. bool current_node() const;
  21. };
  22.  
  23. class term {
  24. uint32_t _val;
  25. public:
  26. term() = default;
  27. explicit term(uint32_t val) noexcept
  28. : _val(val) {
  29. }
  30.  
  31. uint32_t value() const {
  32. return _val;
  33. }
  34.  
  35. friend std::strong_ordering operator<=>(const term&, const term&);
  36. };
  37.  
  38. class log_offset {
  39. uint64_t _pos;
  40. public:
  41. log_offset() = default;
  42. explicit log_offset(uint32_t pos) noexcept
  43. : _pos(pos) {
  44. }
  45.  
  46. uint64_t position() const {
  47. return _pos;
  48. }
  49.  
  50. friend std::strong_ordering operator<=>(const log_offset&, const log_offset&);
  51. };
  52.  
  53. struct log_index {
  54. term term;
  55. seastar::shard_id leader_shard;
  56. log_offset offset;
  57. };
  58.  
  59. class hybrid_clock {
  60. public:
  61. using logical_component = uint32_t;
  62. using physical_component = uint32_t;
  63.  
  64. class time_point {
  65. uint64_t _rep;
  66. public:
  67. friend std::weak_equality operator<=>(const time_point&, const time_point&);
  68. };
  69.  
  70. void update(logical_component);
  71.  
  72. time_point now();
  73. };
  74.  
  75. class group_id {
  76. uint64_t _id;
  77. public:
  78. explicit group_id(uint64_t id) noexcept
  79. : _id(id) {
  80. }
  81.  
  82. friend std::strong_equality operator<=>(const group_id&, const group_id&);
  83. };
  84.  
  85. struct group {
  86. group_id id;
  87. std::vector<replica> replicas;
  88. };
  89.  
  90. namespace provider {
  91.  
  92. // Sends Raft messages between members of a group. The module is not specific to a Raft group.
  93. class rpc {
  94. public:
  95. struct entry {
  96. fragmented_temporary_buffer data;
  97. log_index index;
  98. };
  99.  
  100. struct success { };
  101.  
  102. struct stale_leader {
  103. term current_term;
  104. };
  105.  
  106. struct log_index_mismatch {
  107. log_index current_log_index;
  108. };
  109.  
  110. struct prev_term_indexes {
  111. std::vector<log_index> indexes;
  112. };
  113.  
  114. struct got_vote_tag { };
  115. using got_vote = bool_class<got_vote_tag>;
  116.  
  117. // Sends the append entries payload, containing the following:
  118. // struct append_entries_payload {
  119. // term leader_term;
  120. // replica leader;
  121. // uint32_t shard;
  122. // log_index leader_commit_index;
  123. // log_index prev_log_index;
  124. // term prev_log_term;
  125. // entry entries[];
  126. // };
  127. // The follower either successfully processes the RPC, or updates a stale leader with
  128. // the current term, or sends the last index it has on its log for the current term (if
  129. // doesn't match the index of the first entry), or returns the log indexes of all leader
  130. // shards for the last term for which it received entries.
  131. virtual future<std::variant<success, stale_leader, log_index_mismatch, prev_term_indexes>> append_entries(
  132. replica destination,
  133. term leader_term,
  134. log_index leader_commit_index,
  135. log_index prev_log_index,
  136. std::vector<entry>) = 0;
  137. // Sends the request vote payload, containing the following:
  138. // struct request_vote_payload {
  139. // term candidate_term;
  140. // replica candidate;
  141. // log_index last_index;
  142. // };
  143. // The candidate either receives the vote or not, or it is updated with the current term of the group.
  144. virtual future<std::variant<got_vote, stale_leader>> request_vote(
  145. std::vector<replica> peers,
  146. term candidate_term,
  147. log_index last_index) = 0;
  148. };
  149.  
  150. // Sends heartbeats on behalf of all groups of this node, and across all shards
  151. // Should exist only on one shard.
  152. class heartbeats {
  153. public:
  154. // The callback receives the election timeout to use. That timeout can be biased
  155. // by how many groups the current node is already a leader of.
  156. using on_leader_timeout = std::function<future<group_leader>(const group&, seastar::lowres_clock::time_point)>;
  157. protected:
  158. on_leader_timeout _on_leader_timeout;
  159. public:
  160. heartbeats(on_leader_timeout cb)
  161. : _on_leader_timeout(std::move(cb)) {
  162. }
  163. // Registers a new Raft group, for which leader election should be triggered.
  164. virtual future<group_leader> register_group(const group&) = 0;
  165. // Stop sending heartbeats for this group, as leadership has been relinquished.
  166. virtual future<> relinquish_leadership(const group&) = 0;
  167. // Sets the commit index for a particular group. If the current node is the leader,
  168. // then that commit index is sent in heartbeat messages.
  169. virtual future<> set_commit_index(const group&, log_index) = 0;
  170. // Unregisters a Raft group.
  171. virtual future<> unregister_group(const group&) = 0;
  172. };
  173.  
  174. // State-machine for a particular Raft group.
  175. class state_machine {
  176. // Transfers the contents of the state machine to the specified replica.
  177. virtual future<> transfer_to(replica) = 0;
  178. // Applies the specified entry to the state machine.
  179. virtual future<> apply(fragmented_temporary_buffer) = 0;
  180. };
  181.  
  182. // Stores the persistent Raft state belonging to a Raft group.
  183. class state {
  184. // Registers the replica for which the current node voted, for the specified term.
  185. // Should replicate that information across shards. When the future resolves, the
  186. // information should be persisted on disk.
  187. virtual future<> register_vote(term, replica) = 0;
  188. // Registers the current commit index for the current leader shard. When the future resolves,
  189. // the information is not guaranteed to have been persisted.
  190. virtual future<> register_commit_index_relaxed(log_index) = 0;
  191. // Registers the previous information of the discarded log for the current leader shard.
  192. // When the future resolves, the information should be persisted on disk.
  193. virtual future<> register_log_compaction(log_index, std::vector<replica> config) = 0;
  194. };
  195.  
  196. // Implements the persistent Raft log for a particular Raft group.
  197. class log {
  198. // Appends the specified entry for the current shard, at the specified term.
  199. // When the future resolves, the entry should be persisted in stable storage.
  200. virtual future<log_index> append(
  201. term,
  202. size_t,
  203. std::function<void(fragmented_temporary_buffer::ostream&)>) = 0;
  204. // The persisted tail of the log for the current shard, used to include in the append_entries RPC.
  205. virtual log_index tail() const = 0;
  206. // The tail of the log for all shards of a term, used to bring up a follower to date.
  207. virtual std::vector<log_index> tail_for_all_shards_of(term) = 0;
  208. // Discards a prefix of the log, to support log compaction.
  209. virtual future<> discard_prefix(log_index) = 0;
  210. // Discards a suffix of the log, when support removing entries from an outdated follower.
  211. virtual future<> discard_suffix(log_index) = 0;
  212. // Reads a set of entries in the specified range.
  213. using process_entry = std::function<future<>(const fragmented_temporary_buffer&)>;
  214. virtual future<> read(log_index, log_index, process_entry) = 0;
  215. };
  216.  
  217. } // namespace provider
  218.  
  219. // The base type of Raft entries. There can be multiple types of entries managed
  220. // by the same Raft group, each with different requirements.
  221. class entry {
  222. using type = unsigned;
  223. // The type of entry, useful to retrieve associated types (e.g., a deserializer).
  224. virtual type entry_type() const = 0;
  225. // Serializes the entry to the specified buffer.
  226. virtual void write(fragmented_temporary_buffer::ostream&) = 0;
  227. // Called when the entry has been replicated across the set of replicas of the group.
  228. // Side-effects should be idempotent, as the callback can be called multiple times (e.g.,
  229. // when replaying the log).
  230. virtual future<> on_replicated() = 0;
  231. // Sets the timestamp
  232. virtual void set_timestamp(hybrid_clock::time_point) = 0;
  233. };
  234.  
  235. // Deserializes entries of a particular type.
  236. class entry_deserializer {
  237. virtual entry read(const fragmented_temporary_buffer&) = 0;
  238. };
  239.  
  240. // Exposes the Raft protocol to external consumers, for a particular group.
  241. // Consumes the heartbeat, rpc, state machine, state and log providers.
  242. class protocol {
  243. public:
  244. // Specifies whether the current node is the leader of this Raft instance.
  245. virtual bool is_leader() const = 0;
  246. // Returns the leader, useful to forward requests to it.
  247. virtual group_leader leader() const = 0;
  248. // Replicates the specified entry across the Raft group. Returns when the entry has been committed.
  249. virtual future<> replicate(entry) = 0;
  250. // When the group's configuration changes. Calls to this function are ordered w.r.t. calls to replicate().
  251. virtual future<> update_configuration(std::vector<replica>) = 0;
  252. };
  253.  
  254. } // namespace raft
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement