Advertisement
Guest User

Untitled

a guest
Oct 18th, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 3.91 KB | None | 0 0
  1. --- a   2017-10-18 09:22:12.141678081 +0200
  2. +++ b   2017-10-18 09:22:12.177678618 +0200
  3. @@ -98,6 +98,7 @@
  4.       /* map where key is the endpoint and value is the state associated with the endpoint */
  5.       std::unordered_map<inet_address, endpoint_state> endpoint_state_map;
  6.  -    std::unordered_map<inet_address, endpoint_state> shadow_endpoint_state_map;
  7. ++    // Used for serializing changes to endpoint_state_map and running of associated change listeners.
  8.  +    endpoint_locks_map endpoint_locks;
  9.  
  10.       const std::vector<sstring> DEAD_STATES = {
  11. @@ -106,7 +107,15 @@
  12.       std::vector<inet_address> _shadow_live_endpoints;
  13.  
  14.       void run();
  15. ++    // Replicates given endpoint_state to all other shards.
  16. ++    // The state state doesn't have to be kept alive around until completes.
  17.  +    future<> replicate(inet_address, const endpoint_state&);
  18. ++    // Replicates "states" from "src" to all other shards.
  19. ++    // "src" and "states" must be kept alive until completes and must not change.
  20. ++    future<> replicate(inet_address, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& states);
  21. ++    // Replicates given value to all other shards.
  22. ++    // The value must be kept alive until completes and not change.
  23. ++    future<> replicate(inet_address, application_state key, const versioned_value& value);
  24.   public:
  25.       gossiper();
  26.  
  27. @@ -576,6 +585,24 @@
  28.  +    });
  29.  +}
  30.  +
  31. ++future<> gossiper::replicate(inet_address ep, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& changed) {
  32. ++    return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
  33. ++        if (engine().cpu_id() != orig) {
  34. ++            for (auto&& key : changed) {
  35. ++                g.endpoint_state_map[ep].apply_application_state(key, src.at(key));
  36. ++            }
  37. ++        }
  38. ++    });
  39. ++}
  40. ++
  41. ++future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
  42. ++    return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
  43. ++        if (engine().cpu_id() != orig) {
  44. ++            g.endpoint_state_map[ep].apply_application_state(key, value);
  45. ++        }
  46. ++    });
  47. ++}
  48. ++
  49.   future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
  50.       return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] {
  51.  -        auto& state = endpoint_state_map.at(endpoint);
  52. @@ -767,13 +794,7 @@
  53.  +    // would be inconsistent across shards. Changes listeners depend on state
  54.  +    // being replicated to all shards.
  55.  +    auto replicate_changes = seastar::defer([&] () noexcept {
  56. -+        container().invoke_on_all([&, self = shared_from_this()] (gossiper& g) {
  57. -+            if (engine().cpu_id() != 0) {
  58. -+                for (auto&& key : changed) {
  59. -+                    g.endpoint_state_map[addr].apply_application_state(key, remote_map.at(key));
  60. -+                }
  61. -+            }
  62. -+        }).get();
  63. ++        replicate(addr, remote_map, changed).get();
  64.  +    });
  65.  +
  66.       // we need to make two loops here, one to apply, then another to notify,
  67. @@ -892,13 +913,7 @@
  68.  +            es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
  69.  +            if (es) {
  70.  +                es->add_application_state(state, value);
  71. -+
  72. -+                gossiper.container().invoke_on_all([g, state, &value, ep_addr] (auto& remote_gossiper) {
  73. -+                    if (engine().cpu_id() != 0) {
  74. -+                        remote_gossiper.endpoint_state_map[ep_addr].apply_application_state(state, value);
  75. -+                    }
  76. -+                }).get();
  77. -+
  78. ++                gossiper.replicate(ep_addr, state, value).get();
  79.                   gossiper.do_on_change_notifications(ep_addr, state, value);
  80.               }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement