Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- --- a 2017-10-18 09:22:12.141678081 +0200
- +++ b 2017-10-18 09:22:12.177678618 +0200
- @@ -98,6 +98,7 @@
- /* map where key is the endpoint and value is the state associated with the endpoint */
- std::unordered_map<inet_address, endpoint_state> endpoint_state_map;
- - std::unordered_map<inet_address, endpoint_state> shadow_endpoint_state_map;
- ++ // Used for serializing changes to endpoint_state_map and running of associated change listeners.
- + endpoint_locks_map endpoint_locks;
- const std::vector<sstring> DEAD_STATES = {
- @@ -106,7 +107,15 @@
- std::vector<inet_address> _shadow_live_endpoints;
- void run();
- ++ // Replicates given endpoint_state to all other shards.
- ++ // The state state doesn't have to be kept alive around until completes.
- + future<> replicate(inet_address, const endpoint_state&);
- ++ // Replicates "states" from "src" to all other shards.
- ++ // "src" and "states" must be kept alive until completes and must not change.
- ++ future<> replicate(inet_address, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& states);
- ++ // Replicates given value to all other shards.
- ++ // The value must be kept alive until completes and not change.
- ++ future<> replicate(inet_address, application_state key, const versioned_value& value);
- public:
- gossiper();
- @@ -576,6 +585,24 @@
- + });
- +}
- +
- ++future<> gossiper::replicate(inet_address ep, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& changed) {
- ++ return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
- ++ if (engine().cpu_id() != orig) {
- ++ for (auto&& key : changed) {
- ++ g.endpoint_state_map[ep].apply_application_state(key, src.at(key));
- ++ }
- ++ }
- ++ });
- ++}
- ++
- ++future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
- ++ return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
- ++ if (engine().cpu_id() != orig) {
- ++ g.endpoint_state_map[ep].apply_application_state(key, value);
- ++ }
- ++ });
- ++}
- ++
- future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
- return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] {
- - auto& state = endpoint_state_map.at(endpoint);
- @@ -767,13 +794,7 @@
- + // would be inconsistent across shards. Changes listeners depend on state
- + // being replicated to all shards.
- + auto replicate_changes = seastar::defer([&] () noexcept {
- -+ container().invoke_on_all([&, self = shared_from_this()] (gossiper& g) {
- -+ if (engine().cpu_id() != 0) {
- -+ for (auto&& key : changed) {
- -+ g.endpoint_state_map[addr].apply_application_state(key, remote_map.at(key));
- -+ }
- -+ }
- -+ }).get();
- ++ replicate(addr, remote_map, changed).get();
- + });
- +
- // we need to make two loops here, one to apply, then another to notify,
- @@ -892,13 +913,7 @@
- + es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
- + if (es) {
- + es->add_application_state(state, value);
- -+
- -+ gossiper.container().invoke_on_all([g, state, &value, ep_addr] (auto& remote_gossiper) {
- -+ if (engine().cpu_id() != 0) {
- -+ remote_gossiper.endpoint_state_map[ep_addr].apply_application_state(state, value);
- -+ }
- -+ }).get();
- -+
- ++ gossiper.replicate(ep_addr, state, value).get();
- gossiper.do_on_change_notifications(ep_addr, state, value);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement