Advertisement
Guest User

Untitled

a guest
Jun 20th, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Erlang 3.17 KB | None | 0 0
  1. -module(node1).
  2. %%
  3. %% Exported Functions
  4. %%
  5. -export([start/1,start/2]).
  6.  
  7. %%
  8. %% Definitions
  9. %%
  10. -define(Stabilize, 3000).
  11. -define(Timeout, 10000).
  12.  
  13. %%
  14. %% API Functions
  15. %%
  16. start(Id) ->
  17.     start(Id, nil).
  18.  
  19. start(Id, Peer) ->
  20.     timer:start(),
  21.     spawn(fun() -> init(Id, Peer) end).
  22.  
  23. %%
  24. %% Local Functions
  25. %%
  26. init(Id, Peer) ->
  27.     Predecessor = nil,
  28.     {ok, Successor} = connect(Id, Peer),
  29.     schedule_stabilize(),
  30.     node(Id, Predecessor, Successor).
  31.  
  32. connect(Id, nil) ->
  33.     {ok, {Id, self()}};
  34.  
  35. connect(Id, Peer) ->
  36.     Qref = make_ref(),
  37.     Peer ! {key, Qref, self()},
  38.     receive
  39.         {Qref, Skey} ->
  40.             io:format("ok ~w~n",[Id]),
  41.             {ok, {Skey, Peer}}
  42.         after ?Timeout ->
  43.             io:format("Time out ~w: no response~n",[Id])
  44.     end.
  45.  
  46. node(Id, Predecessor, Successor) ->
  47.     receive
  48.         {key, Qref, Peer} ->
  49.             Peer ! {Qref, Id},
  50.             node(Id, Predecessor, Successor);
  51.         {notify, New} ->
  52.             Pred = notify(New, Id, Predecessor),
  53.             node(Id, Pred, Successor);
  54.         {request, Peer} ->
  55.             request(Peer, Predecessor),
  56.             node(Id, Predecessor, Successor);
  57.         {status, Pred} ->
  58.             Succ = stabilize(Pred, Id, Successor),
  59.             node(Id, Predecessor, Succ);
  60.         probe ->
  61.             create_probe(Id, Successor),
  62.             node(Id, Predecessor, Successor);
  63.         {probe, Id, List, T} ->
  64.             remove_probe(Id, T, List),
  65.             node(Id, Predecessor, Successor);
  66.         {probe, Ref, List, T} ->
  67.             forward_probe(Id, Ref, T, List, Successor),
  68.             node(Id, Predecessor, Successor);
  69.         stabilize ->
  70.             stabilize(Successor),
  71.             node(Id, Predecessor, Successor); %%node(Id, Predecessor, Successor, Store);
  72.         stop ->
  73.             ok;
  74.         Error ->
  75.             io:format("node ~w, strange message ~w~n", [Id, Error])
  76.     end.
  77.  
  78. stabilize({_, Spid}) ->
  79.     Spid ! {request, self()}.
  80.  
  81. stabilize(Pred, Id, Successor) ->
  82.     {Skey, Spid} = Successor,
  83.     case Pred of
  84.         nil ->
  85.             Spid ! {notify, {Id, self()}},
  86.             Successor;
  87.         {Id, _} ->
  88.             Successor;
  89.         {Skey, _} ->
  90.             Spid ! {notify, {Id, self()}},
  91.             Successor;
  92.         {Xkey, Xpid} ->
  93.             case key:between(Xkey, Id, Skey) of
  94.                 true ->
  95.                     Xpid ! {notify, {Id, self()}},
  96.                     {Xkey, Xpid};
  97.                 false ->
  98.                     Spid ! {notify, {Id, self()}},
  99.                     Successor
  100.             end
  101.     end.
  102.  
  103. schedule_stabilize() ->
  104.     timer:send_interval(?Stabilize, stabilize).
  105.  
  106. request(Peer, Predecessor) ->
  107.     case Predecessor of
  108.         nil ->
  109.             Peer ! {status, nil};
  110.         {Pkey, Ppid} ->
  111.             Peer ! {status, {Pkey, Ppid}}
  112.     end.
  113.  
  114. notify({Nkey, Npid}, Id, Predecessor) ->
  115.     io:format("Id ~w, is notified of: {~w, ~w}, predecessor: ~w~n", [Id, Nkey, Npid, Predecessor]),
  116.     case Predecessor of
  117.         nil ->
  118.             {Nkey, Npid};
  119.         {Pkey, _} ->
  120.             case key:between(Nkey, Pkey, Id) of
  121.                 true ->
  122.                     {Nkey, Npid};x
  123.                 false ->
  124.                     Npid ! {status, Predecessor},
  125.                     Predecessor
  126.             end
  127.     end.
  128.  
  129. forward_probe(Id, Ref, T, Nodes, Successor) ->
  130.     {_, Spid} = Successor,
  131.     io:format("Id: ~w probe passthrough to ~w.~n",[Id, Successor]),
  132.     Spid ! {probe, Id, [Ref | Nodes], T}.
  133.  
  134. create_probe(Id, Successor) ->
  135.     {_, Spid} = Successor,
  136.     io:format("Id: ~w Probing ring to ~w .~n",[Id, Successor]),
  137.     T = erlang:now(),
  138.     Spid ! {probe, Id, T, []}.
  139.  
  140. remove_probe(Id, T, Nodes) ->
  141.     io:format("Id: ~w probe reached goal in ~w seconds~n", [Id, timer:now_diff(now(),T), Nodes]).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement