Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(node1).
- %%
- %% Exported Functions
- %%
- -export([start/1,start/2]).
- %%
- %% Definitions
- %%
- -define(Stabilize, 3000).
- -define(Timeout, 10000).
- %%
- %% API Functions
- %%
- start(Id) ->
- start(Id, nil).
- start(Id, Peer) ->
- timer:start(),
- spawn(fun() -> init(Id, Peer) end).
- %%
- %% Local Functions
- %%
- init(Id, Peer) ->
- Predecessor = nil,
- {ok, Successor} = connect(Id, Peer),
- schedule_stabilize(),
- node(Id, Predecessor, Successor).
- connect(Id, nil) ->
- {ok, {Id, self()}};
- connect(Id, Peer) ->
- Qref = make_ref(),
- Peer ! {key, Qref, self()},
- receive
- {Qref, Skey} ->
- io:format("ok ~w~n",[Id]),
- {ok, {Skey, Peer}}
- after ?Timeout ->
- io:format("Time out ~w: no response~n",[Id])
- end.
- node(Id, Predecessor, Successor) ->
- receive
- {key, Qref, Peer} ->
- Peer ! {Qref, Id},
- node(Id, Predecessor, Successor);
- {notify, New} ->
- Pred = notify(New, Id, Predecessor),
- node(Id, Pred, Successor);
- {request, Peer} ->
- request(Peer, Predecessor),
- node(Id, Predecessor, Successor);
- {status, Pred} ->
- Succ = stabilize(Pred, Id, Successor),
- node(Id, Predecessor, Succ);
- probe ->
- create_probe(Id, Successor),
- node(Id, Predecessor, Successor);
- {probe, Id, List, T} ->
- remove_probe(Id, T, List),
- node(Id, Predecessor, Successor);
- {probe, Ref, List, T} ->
- forward_probe(Id, Ref, T, List, Successor),
- node(Id, Predecessor, Successor);
- stabilize ->
- stabilize(Successor),
- node(Id, Predecessor, Successor); %%node(Id, Predecessor, Successor, Store);
- stop ->
- ok;
- Error ->
- io:format("node ~w, strange message ~w~n", [Id, Error])
- end.
- stabilize({_, Spid}) ->
- Spid ! {request, self()}.
- stabilize(Pred, Id, Successor) ->
- {Skey, Spid} = Successor,
- case Pred of
- nil ->
- Spid ! {notify, {Id, self()}},
- Successor;
- {Id, _} ->
- Successor;
- {Skey, _} ->
- Spid ! {notify, {Id, self()}},
- Successor;
- {Xkey, Xpid} ->
- case key:between(Xkey, Id, Skey) of
- true ->
- Xpid ! {notify, {Id, self()}},
- {Xkey, Xpid};
- false ->
- Spid ! {notify, {Id, self()}},
- Successor
- end
- end.
- schedule_stabilize() ->
- timer:send_interval(?Stabilize, stabilize).
- request(Peer, Predecessor) ->
- case Predecessor of
- nil ->
- Peer ! {status, nil};
- {Pkey, Ppid} ->
- Peer ! {status, {Pkey, Ppid}}
- end.
- notify({Nkey, Npid}, Id, Predecessor) ->
- io:format("Id ~w, is notified of: {~w, ~w}, predecessor: ~w~n", [Id, Nkey, Npid, Predecessor]),
- case Predecessor of
- nil ->
- {Nkey, Npid};
- {Pkey, _} ->
- case key:between(Nkey, Pkey, Id) of
- true ->
- {Nkey, Npid};x
- false ->
- Npid ! {status, Predecessor},
- Predecessor
- end
- end.
- forward_probe(Id, Ref, T, Nodes, Successor) ->
- {_, Spid} = Successor,
- io:format("Id: ~w probe passthrough to ~w.~n",[Id, Successor]),
- Spid ! {probe, Id, [Ref | Nodes], T}.
- create_probe(Id, Successor) ->
- {_, Spid} = Successor,
- io:format("Id: ~w Probing ring to ~w .~n",[Id, Successor]),
- T = erlang:now(),
- Spid ! {probe, Id, T, []}.
- remove_probe(Id, T, Nodes) ->
- io:format("Id: ~w probe reached goal in ~w seconds~n", [Id, timer:now_diff(now(),T), Nodes]).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement