Advertisement
Guest User

Untitled

a guest
Jun 20th, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Erlang 5.15 KB | None | 0 0
  1. -module(node2).
  2. %%
  3. %% Exported Functions
  4. %%
  5. -export([start/1,start/2]).
  6.  
  7. %%
  8. %% Definitions
  9. %%
  10. -define(Stabilize, 1000).
  11. -define(Timeout, 10000).
  12.  
  13. %%
  14. %% API Functions
  15. %%
  16. start(Id) ->
  17.     timer:start(),
  18.     start(Id, nil).
  19.  
  20. start(Id, Peer) ->
  21.     spawn(fun() -> init(Id, Peer) end).
  22.  
  23. %%
  24. %% Local Functions
  25. %%
  26. init(Id, Peer) ->
  27.     Predecessor = nil,
  28.     {ok, Successor} = connect(Id, Peer),
  29.     schedule_stabilize(),
  30.     node(Id, Predecessor, Successor, storage:create()).
  31.  
  32. connect(Id, nil) ->
  33.     {ok, {Id, self()}};
  34. connect(Id, Peer) ->
  35.     Qref = make_ref(),
  36.     Peer ! {key, Qref, self()},
  37.     receive
  38.         {Qref, Skey} ->
  39.             io:format("ok ~w~n",[Id]),
  40.             {ok, {Skey, Peer}}
  41.         after ?Timeout ->
  42.             io:format("Time out ~w: no response~n",[Id])
  43.     end.
  44.  
  45. node(Id, Predecessor, Successor, Store) ->
  46.     receive
  47.         {key, Qref, Peer} ->
  48.             Peer ! {Qref, Id},
  49.             node(Id, Predecessor, Successor, Store);
  50.         {notify, New} ->
  51.             {Pred, Store2} = notify(New, Id, Predecessor, Store),
  52.             node(Id, Pred, Successor, Store2);
  53.         {request, Peer} ->
  54.             request(Peer, Predecessor),
  55.             node(Id, Predecessor, Successor, Store);
  56.         {status, Pred} ->
  57.             Succ = stabilize(Pred, Id, Successor),
  58.             node(Id, Predecessor, Succ, Store);
  59.         stabilize ->
  60.             stabilize(Successor),
  61.             node(Id, Predecessor, Successor, Store); %%node(Id, Predecessor, Successor, Store);
  62.         print ->
  63.             io:format("node ~w, pred ~w, succ ~w~n", [{Id,self()},Predecessor, Successor]),
  64.             node(Id, Predecessor, Successor, Store);
  65.         showcase ->
  66.             io:format("~nStorage: ~w~n",[Store]),
  67.             node(Id, Predecessor, Successor, Store);
  68.         probe ->
  69.             io:format("node ~w, probe received~n", [Id]),
  70.             create_probe(Id, Successor),
  71.             node(Id, Predecessor, Successor, Store);
  72.         {probe, Start, TStart, Nodes} when Start == Id ->
  73.             remove_probe(Id, TStart, Nodes),
  74.             node(Id, Predecessor, Successor, Store);
  75.         {probe, Start, TStart, Nodes} ->
  76.             forward_probe(Id, Start, TStart, Nodes, Successor),
  77.             node(Id, Predecessor, Successor, Store);
  78.         {handover, Elements} ->
  79.             Merged = storage:merge(Store, Elements),
  80.             node(Id, Predecessor, Successor, Merged);
  81.         {add, Key, Value, Qref, Client} ->
  82.             Added = add(Key, Value, Qref, Client, Id, Predecessor, Successor, Store),
  83.             node(Id, Predecessor, Successor, Added);
  84.         {lookup, Key, Qref, Client} ->
  85.             lookup(Key, Qref, Client, Id, Predecessor, Successor, Store),
  86.             node(Id, Predecessor, Successor, Store);
  87.         {lookup, Key} ->
  88.             self() ! {lookup, Key, make_ref(), self()},
  89.             node(Id, Predecessor, Successor, Store);
  90.         {_Qref, Result} ->
  91.             case Result of
  92.                 false ->
  93.                     io:format("Id: ~w, key not found.~n",[Id]);
  94.                 _Else ->
  95.                     io:format("Id: ~w, key found and delivered: ~w~n",[Id, Result])
  96.             end,
  97.             node(Id, Predecessor, Successor, Store);
  98.         Error ->
  99.             io:format("node ~w, strange message ~w~n", [Id, Error]),
  100.             node(Id, Predecessor, Successor, Store)
  101.     end.
  102.  
  103. add(Key, Value, Qref, Client, Id, {Pkey, _}, {_, Spid}, Store) ->
  104.     case key:between(Key, Pkey, Id) of
  105.         true ->
  106.             Client ! {Qref, ok},
  107.             %io:format("Id: ~w, added key: ~w~n", [Id, Key]),
  108.             storage:add(Store, Key, Value);
  109.         false ->
  110.             %io:format("Id: ~w, forwarded key: ~w~n",[Id, Key]),
  111.             Spid ! {add, Key, Value, Qref, Client},
  112.             Store
  113.     end.
  114.  
  115. lookup(Key, Qref, Client, Id, {Pkey, _}, Successor, Store) ->
  116.     case key:between(Key, Pkey, Id) of
  117.         true ->
  118.             Result = storage:lookup(Key, Store),
  119.             Client ! {Qref, Result};
  120.         false ->
  121.             {_, Spid} = Successor,
  122.             Spid ! {lookup, Key, Qref, Client}
  123.     end.
  124.  
  125. stabilize({_, Spid}) ->
  126.     Spid ! {request, self()}.
  127.  
  128. stabilize(Pred, Id, Successor) ->
  129.     {Skey, Spid} = Successor,
  130.     case Pred of
  131.         nil ->
  132.             Spid ! {notify, {Id, self()}},
  133.             Successor;
  134.         {Id, _} ->
  135.             Successor;
  136.         {Skey, _} ->
  137.             Spid ! {notify, {Id, self()}},
  138.             Successor;
  139.         {Xkey, Xpid} ->
  140.             case key:between(Xkey, Id, Skey) of
  141.                 true ->
  142.                     Xpid ! {notify, {Id, self()}},
  143.                     {Xkey, Xpid};
  144.                 false ->
  145.                     Spid ! {notify, {Id, self()}},
  146.                     Successor
  147.             end
  148.     end.
  149.  
  150. schedule_stabilize() ->
  151.     timer:send_interval(?Stabilize, self(), stabilize).
  152.  
  153. request(Peer, Predecessor) ->
  154.     case Predecessor of
  155.         nil ->
  156.             Peer ! {status, nil};
  157.         {Pkey, Ppid} ->
  158.             Peer ! {status, {Pkey, Ppid}}
  159.     end.
  160.  
  161. notify({Nkey, Npid}, Id, Predecessor, Store) ->
  162.     case Predecessor of
  163.         nil ->
  164.             Keep = handover(Store, Nkey, Npid),
  165.             {{Nkey, Npid}, Keep};
  166.         {Pkey, _} ->
  167.             case key:between(Nkey, Pkey, Id) of
  168.                 true ->
  169.                     Keep = handover(Store, Nkey, Npid),
  170.                     {{Nkey, Npid}, Keep};                  
  171.                 false ->
  172.                     Npid ! {status, Predecessor},
  173.                     {Predecessor, Store}
  174.             end
  175.     end.
  176.  
  177. handover(Store, Nkey, Npid) ->
  178.     {Keep, Leave} = storage:split(Nkey, Store),
  179.     Npid ! {handover, Leave},
  180.     Keep.
  181.  
  182. forward_probe(Id, Start, TStart, Nodes, Successor) ->
  183.     {_Skey, Spid} = Successor,
  184.     io:format("Id: ~w probe passthrough to ~w.~n",[Id, Successor]),
  185.     Spid ! {probe, Start, TStart, [Id|Nodes]}.
  186.  
  187. create_probe(Start, Successor) ->
  188.     {_Skey, Spid} = Successor,
  189.     io:format("Id: ~w Probing ring.~n", Start),
  190.     Spid ! {probe, Start, erlang:now(), []}.
  191.  
  192. remove_probe(Id, T, Nodes) ->
  193.     io:format("Id: ~w probe reached goal in ~w seconds~nRoute: ~w~n", [Id, timer:now_diff(now(),T)/1000, Nodes]).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement