Advertisement
Guest User

Untitled

a guest
Jun 20th, 2017
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Erlang 7.96 KB | None | 0 0
  1. -module(node2).
  2. -export([start/1, start/2]).
  3. -define(Stabilize, 5000).
  4. -define(Timeout, 10000).
  5. -include("debug.hrl").
  6.  
  7. start(Id) ->
  8.     start(Id, nil).
  9. start(Id, Peer) ->
  10.     timer:start(),
  11.     spawn(fun() -> init(Id, Peer) end).
  12.  
  13. init(Id, Peer) ->
  14.     Predecessor = nil,
  15.     {ok, Successor} = connect(Id, Peer),
  16.     schedule_stabilize(),
  17.     Store = storage:create(),
  18.     node(Id, Predecessor, Successor, Store).
  19.  
  20. connect(Id, nil) ->
  21.     ?DEBUG("Node(~w): conenct to self~n",[Id]),
  22.     {ok, {Id, self()}};
  23. connect(Id, Peer) ->
  24.     ?DEBUG("Node(~w): connect to ~w~n",[Id, Peer]),
  25.     Qref = make_ref(),
  26.     Peer ! {key, Qref, self()},
  27.     receive
  28.          {Qref, Skey} ->
  29.          ?DEBUG("Node(~w): got key ~w , sending notification.~n",[Id, Skey]),
  30.             Peer ! {notify, {Id, self()}},
  31.             {ok, {Skey, Peer}}
  32.     after ?Timeout ->
  33.              io:format("Time out: no response~n")
  34.     end.
  35.  
  36. node(Id, Predecessor, Successor, Store) ->
  37.     ?DEBUG("Node(~w): Successor is ~w, Predecessor is ~w~n",[Id, Successor, Predecessor]),
  38.     receive
  39.         {key, Qref, Peer} ->
  40.         %% Todo, search recursiveley in the ring?
  41.             Peer ! {Qref, Id},
  42.             node(Id, Predecessor, Successor, Store);
  43.         {notify, New} ->
  44.         {Nkey, _} = New,
  45.         ?DEBUG("Node(~w): Got a notification from ~w~n",[Id, Nkey]),
  46.             {Pred, NStore} = notify(New, Id, Predecessor, Store),
  47.             node(Id, Pred, Successor, NStore);
  48.         {request, Peer} ->
  49.             request(Peer, Predecessor),
  50.             node(Id, Predecessor, Successor, Store);
  51.         {status, Pred} ->
  52.             Succ = stabilize(Pred, Id, Successor),
  53.             node(Id, Predecessor, Succ, Store);
  54.         stabilize ->
  55.         if
  56.         (Predecessor /= nil) ->
  57.             {Key,_} = Predecessor;
  58.         true ->
  59.             Key = nil
  60.         end,
  61.         ?DEBUG("Node(~w): Sending reqest to predecessor ~w~n",[Id, Key]),
  62.             stabilize(Successor),
  63.             node(Id, Predecessor, Successor, Store);
  64.             %node(Id, Predecessor, Successor, Store);
  65.         probe ->
  66.             create_probe(Id, Successor),
  67.             node(Id, Predecessor, Successor, Store);
  68.         {probe, Id, Nodes, T} ->
  69.             remove_probe(Id, T, Nodes),
  70.             node(Id, Predecessor, Successor, Store);
  71.         {probe, Ref, Nodes, T} ->
  72.             forward_probe(Ref, Id, T, Nodes, Successor),
  73.             node(Id, Predecessor, Successor, Store);
  74.     storage_probe ->
  75.             create_storage_probe(Id, Successor, Store),
  76.             node(Id, Predecessor, Successor, Store);
  77.         {storage_probe, Id, Nodes, T} ->
  78.             remove_storage_probe(Id, T, Nodes),
  79.             node(Id, Predecessor, Successor, Store);
  80.         {storage_probe, Ref, Nodes, T} ->
  81.             forward_storage_probe(Ref, Id, T, Nodes, Successor, Store),
  82.             node(Id, Predecessor, Successor, Store);
  83.  
  84.     {add, Key, Value, Qref, Client} ->
  85.         Added = add(Key, Value, Qref, Client,
  86.             Id, Predecessor, Successor, Store),
  87.         node(Id, Predecessor, Successor, Added);
  88.     {lookup, Key, Qref, Client} ->
  89.         lookup(Key, Qref, Client, Id, Predecessor, Successor, Store),
  90.         node(Id, Predecessor, Successor, Store);
  91.     {handover, Elements} ->
  92.         ?H_DEBUG("Node({~w,~w): Received handover: ~w ~n", [Id, self(), Elements]),
  93.         Merged = storage:merge(Store, Elements),
  94.         node(Id, Predecessor, Successor, Merged);
  95.         Error ->
  96.             io:format("Node(~w) strange message: ~w~n", [Id, Error]),
  97.             node(Id, Predecessor, Successor, Store)
  98.    end.
  99.  
  100. add(Key, Value, Qref, Client, Id, {Pkey, _}, {_, Spid}, Store) ->
  101.     case key:between(Key, Pkey, Id) of
  102.     true ->
  103.         Client ! {Qref, ok},
  104.         storage:add(Key, Value, Store);
  105.     false ->
  106.         %% Pass request to successor.
  107.         Spid ! {add, Key, Value, Qref, Client},
  108.         Store
  109. end.
  110.  
  111. lookup(Key, Qref, Client, Id, {Pkey, _}, Successor, Store) ->
  112.     case key:between(Key, Pkey, Id) of
  113.     true ->
  114.         Result = storage:lookup(Key, Store),
  115.         Client ! {Qref, Result};
  116.     false ->
  117.         {_, Spid} = Successor,
  118.         Spid ! {lookup, Key, Qref, Client}
  119. end.
  120.  
  121. handover(Store, Nkey, Npid, Id) when Id > Nkey->
  122.     ?H_DEBUG("Handover: Nkey: ~w, Npid: ~w~n", [Nkey, Npid]),
  123.     {Leave, Keep} = storage:split(Nkey, Store),
  124.     Npid ! {handover, Leave},
  125.     Keep;
  126. handover(Store, Nkey, Npid, Id) when Id < Nkey->
  127.     ?H_DEBUG("Handover: Nkey: ~w, Npid: ~w~n", [Nkey, Npid]),
  128.     {Keep, Leave} = storage:split(Id, Store),
  129.     Npid ! {handover, Leave},
  130.     Keep.
  131.  
  132. create_probe(Id, {_, Spid}) ->
  133.     Spid ! {probe, Id, [{Id, self()}], now()}.
  134.  
  135. remove_probe(Id, T, Nodes) ->
  136.     Time = timer:now_diff(now(), T),
  137.     io:format("Node(~w): time=~.2fms, nodes=~w~n", [Id, Time/1000, Nodes]).
  138.  
  139. forward_probe(Ref, Id, T, Nodes, {_, Spid}) ->
  140.     ?DEBUG("Node(~w): Forwardning probe. ~n", [Id]),
  141.     Spid ! {probe, Ref, [{Id, self()} | Nodes], T}.
  142.  
  143. create_storage_probe(Id, {_, Spid}, Store) ->
  144.     Spid ! {storage_probe, Id, [{Id, Store}], now()}.
  145.  
  146. remove_storage_probe(Id, T, Nodes) ->
  147.     Time = timer:now_diff(now(), T),
  148.     io:format("Node(~w): time=~.2fms, stores=~w~n", [Id, Time/1000, Nodes]).
  149.  
  150. forward_storage_probe(Ref, Id, T, Nodes, {_, Spid}, Store) ->
  151.     ?DEBUG("Node(~w): Forwardning probe. ~n", [Id]),
  152.     Spid ! {storage_probe, Ref, [{Id, Store} | Nodes], T}.
  153.  
  154. % Send status request to sucessor.
  155. % The status response then triggers stabilize in this node.
  156. stabilize({_, Spid}) ->
  157.     Spid ! {request, self()}.
  158.  
  159. stabilize(Pred, Id, Successor) ->
  160.     {Skey, Spid} = Successor,
  161.     case Pred of
  162.         nil ->
  163.         ?DEBUG("Node(~w): Our sucessors pred. pointer is nil, inform ~w.~n", [Id, Skey]),
  164.             % Inform our successor of our existence
  165.             Spid ! {notify, {Id, self()}},
  166.             Successor;
  167.         {Id, _} ->
  168.         ?DEBUG("Node(~w): do nothing.~n", [Id]),
  169.             % We're the predecessor of our successor, don't do annyting.
  170.             Successor;
  171.         {Skey, _} ->
  172.         ?DEBUG("Node(~w): Succesor node ~w is the predecessor of itself.~n", [Id, Skey]),
  173.             % Our succesors predecessor is itself, we should notify our successor of our existence.
  174.             Spid ! {notify, {Id, self()}},
  175.             Successor;
  176.         {Xkey, Xpid} -> % Our successors predecessor is another node.
  177.             case key:between(Xkey, Id, Skey) of
  178.                 true ->
  179.            ?DEBUG("Node(~w): Adopting ~w as new successor.~n", [Id, Xkey]),
  180.                    % Adopt the node as out successor and run stabilization again.
  181.                    stabilize(Pred, Id, {Xkey, Xpid});
  182.                 false ->
  183.             ?DEBUG("Node(~w): Adopting ~w as new successor and sending notification.~n", [Id, Xkey]),
  184.            % Set the sucessors predecessor as our new successor and inform it of our existence.
  185.                    Xpid ! {notify, {Id, self()}},
  186.            {Xkey, Xpid}
  187.         end
  188.     end.
  189.  
  190. schedule_stabilize() ->
  191.     timer:send_interval(?Stabilize, self(), stabilize).
  192.  
  193. request(Peer, Predecessor) ->
  194.     case Predecessor of
  195.          nil ->
  196.              Peer ! {status, nil};
  197.          {Pkey, Ppid} ->
  198.              Peer ! {status, {Pkey, Ppid}}
  199.     end.
  200.  
  201.  
  202. %% We got a notification from someone that claims to be our proper predecessor.
  203. notify({Nkey, Npid}, Id, Predecessor, Store) ->
  204.     case Predecessor of
  205.     nil ->
  206.         %% Our predecessor is nil.
  207.         %% Add the node as our predecessor.
  208.         ?DEBUG("Node(~w): Predeccessor is nil, adding ~w as predecessor.~n", [Id, Nkey]),
  209.         Keep = handover(Store, Nkey, Npid, Id),
  210.             {{Nkey, Npid}, Keep};
  211.     {Pkey, _} ->
  212.         case key:between(Nkey, Pkey, Id) of
  213.         true ->
  214.             %% Add the node as our predecessor
  215.             ?DEBUG("Node(~w): Added node ~w as new predecessor.~n", [Id, Nkey]),                
  216.             Keep = handover(Store, Nkey, Npid, Id),
  217.             {{Nkey, Npid}, Keep};    
  218.         false ->
  219.             ?DEBUG("Node(~w): Notification from ~w denied.~n", [Id, Nkey]),    
  220.             %% Reqest denied, send stabilize to speed things up.
  221.             Npid ! stabilize,
  222.             {Predecessor, Store}
  223.         end
  224.     end.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement