Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(node2).
- %%
- %% Exported Functions
- %%
- -export([start/1,start/2]).
- %%
- %% Definitions
- %%
- -define(Stabilize, 1000).
- -define(Timeout, 10000).
- %%
- %% API Functions
- %%
- start(Id) ->
- timer:start(),
- start(Id, nil).
- start(Id, Peer) ->
- spawn(fun() -> init(Id, Peer) end).
- %%
- %% Local Functions
- %%
- init(Id, Peer) ->
- Predecessor = nil,
- {ok, Successor} = connect(Id, Peer),
- schedule_stabilize(),
- node(Id, Predecessor, Successor, storage:create()).
- connect(Id, nil) ->
- {ok, {Id, self()}};
- connect(Id, Peer) ->
- Qref = make_ref(),
- Peer ! {key, Qref, self()},
- receive
- {Qref, Skey} ->
- io:format("ok ~w~n",[Id]),
- {ok, {Skey, Peer}}
- after ?Timeout ->
- io:format("Time out ~w: no response~n",[Id])
- end.
- node(Id, Predecessor, Successor, Store) ->
- receive
- {key, Qref, Peer} ->
- Peer ! {Qref, Id},
- node(Id, Predecessor, Successor, Store);
- {notify, New} ->
- {Pred, Store2} = notify(New, Id, Predecessor, Store),
- node(Id, Pred, Successor, Store2);
- {request, Peer} ->
- request(Peer, Predecessor),
- node(Id, Predecessor, Successor, Store);
- {status, Pred} ->
- Succ = stabilize(Pred, Id, Successor),
- node(Id, Predecessor, Succ, Store);
- stabilize ->
- stabilize(Successor),
- node(Id, Predecessor, Successor, Store); %%node(Id, Predecessor, Successor, Store);
- print ->
- io:format("node ~w, pred ~w, succ ~w~n", [{Id,self()},Predecessor, Successor]),
- node(Id, Predecessor, Successor, Store);
- showcase ->
- io:format("~nStorage: ~w~n",[Store]),
- node(Id, Predecessor, Successor, Store);
- probe ->
- io:format("node ~w, probe received~n", [Id]),
- create_probe(Id, Successor),
- node(Id, Predecessor, Successor, Store);
- {probe, Start, TStart, Nodes} when Start == Id ->
- remove_probe(Id, TStart, Nodes),
- node(Id, Predecessor, Successor, Store);
- {probe, Start, TStart, Nodes} ->
- forward_probe(Id, Start, TStart, Nodes, Successor),
- node(Id, Predecessor, Successor, Store);
- {handover, Elements} ->
- Merged = storage:merge(Store, Elements),
- node(Id, Predecessor, Successor, Merged);
- {add, Key, Value, Qref, Client} ->
- Added = add(Key, Value, Qref, Client, Id, Predecessor, Successor, Store),
- node(Id, Predecessor, Successor, Added);
- {lookup, Key, Qref, Client} ->
- lookup(Key, Qref, Client, Id, Predecessor, Successor, Store),
- node(Id, Predecessor, Successor, Store);
- {lookup, Key} ->
- self() ! {lookup, Key, make_ref(), self()},
- node(Id, Predecessor, Successor, Store);
- {_Qref, Result} ->
- case Result of
- false ->
- io:format("Id: ~w, key not found.~n",[Id]);
- _Else ->
- io:format("Id: ~w, key found and delivered: ~w~n",[Id, Result])
- end,
- node(Id, Predecessor, Successor, Store);
- Error ->
- io:format("node ~w, strange message ~w~n", [Id, Error]),
- node(Id, Predecessor, Successor, Store)
- end.
- add(Key, Value, Qref, Client, Id, {Pkey, _}, {_, Spid}, Store) ->
- case key:between(Key, Pkey, Id) of
- true ->
- Client ! {Qref, ok},
- %io:format("Id: ~w, added key: ~w~n", [Id, Key]),
- storage:add(Store, Key, Value);
- false ->
- %io:format("Id: ~w, forwarded key: ~w~n",[Id, Key]),
- Spid ! {add, Key, Value, Qref, Client},
- Store
- end.
- lookup(Key, Qref, Client, Id, {Pkey, _}, Successor, Store) ->
- case key:between(Key, Pkey, Id) of
- true ->
- Result = storage:lookup(Key, Store),
- Client ! {Qref, Result};
- false ->
- {_, Spid} = Successor,
- Spid ! {lookup, Key, Qref, Client}
- end.
- stabilize({_, Spid}) ->
- Spid ! {request, self()}.
- stabilize(Pred, Id, Successor) ->
- {Skey, Spid} = Successor,
- case Pred of
- nil ->
- Spid ! {notify, {Id, self()}},
- Successor;
- {Id, _} ->
- Successor;
- {Skey, _} ->
- Spid ! {notify, {Id, self()}},
- Successor;
- {Xkey, Xpid} ->
- case key:between(Xkey, Id, Skey) of
- true ->
- Xpid ! {notify, {Id, self()}},
- {Xkey, Xpid};
- false ->
- Spid ! {notify, {Id, self()}},
- Successor
- end
- end.
- schedule_stabilize() ->
- timer:send_interval(?Stabilize, self(), stabilize).
- request(Peer, Predecessor) ->
- case Predecessor of
- nil ->
- Peer ! {status, nil};
- {Pkey, Ppid} ->
- Peer ! {status, {Pkey, Ppid}}
- end.
- notify({Nkey, Npid}, Id, Predecessor, Store) ->
- case Predecessor of
- nil ->
- Keep = handover(Store, Nkey, Npid),
- {{Nkey, Npid}, Keep};
- {Pkey, _} ->
- case key:between(Nkey, Pkey, Id) of
- true ->
- Keep = handover(Store, Nkey, Npid),
- {{Nkey, Npid}, Keep};
- false ->
- Npid ! {status, Predecessor},
- {Predecessor, Store}
- end
- end.
- handover(Store, Nkey, Npid) ->
- {Keep, Leave} = storage:split(Nkey, Store),
- Npid ! {handover, Leave},
- Keep.
- forward_probe(Id, Start, TStart, Nodes, Successor) ->
- {_Skey, Spid} = Successor,
- io:format("Id: ~w probe passthrough to ~w.~n",[Id, Successor]),
- Spid ! {probe, Start, TStart, [Id|Nodes]}.
- create_probe(Start, Successor) ->
- {_Skey, Spid} = Successor,
- io:format("Id: ~w Probing ring.~n", Start),
- Spid ! {probe, Start, erlang:now(), []}.
- remove_probe(Id, T, Nodes) ->
- io:format("Id: ~w probe reached goal in ~w seconds~nRoute: ~w~n", [Id, timer:now_diff(now(),T)/1000, Nodes]).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement