Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(node2).
- -export([start/1, start/2]).
- -define(Stabilize, 5000).
- -define(Timeout, 10000).
- -include("debug.hrl").
- start(Id) ->
- start(Id, nil).
- start(Id, Peer) ->
- timer:start(),
- spawn(fun() -> init(Id, Peer) end).
- init(Id, Peer) ->
- Predecessor = nil,
- {ok, Successor} = connect(Id, Peer),
- schedule_stabilize(),
- Store = storage:create(),
- node(Id, Predecessor, Successor, Store).
- connect(Id, nil) ->
- ?DEBUG("Node(~w): conenct to self~n",[Id]),
- {ok, {Id, self()}};
- connect(Id, Peer) ->
- ?DEBUG("Node(~w): connect to ~w~n",[Id, Peer]),
- Qref = make_ref(),
- Peer ! {key, Qref, self()},
- receive
- {Qref, Skey} ->
- ?DEBUG("Node(~w): got key ~w , sending notification.~n",[Id, Skey]),
- Peer ! {notify, {Id, self()}},
- {ok, {Skey, Peer}}
- after ?Timeout ->
- io:format("Time out: no response~n")
- end.
- node(Id, Predecessor, Successor, Store) ->
- ?DEBUG("Node(~w): Successor is ~w, Predecessor is ~w~n",[Id, Successor, Predecessor]),
- receive
- {key, Qref, Peer} ->
- %% Todo, search recursiveley in the ring?
- Peer ! {Qref, Id},
- node(Id, Predecessor, Successor, Store);
- {notify, New} ->
- {Nkey, _} = New,
- ?DEBUG("Node(~w): Got a notification from ~w~n",[Id, Nkey]),
- {Pred, NStore} = notify(New, Id, Predecessor, Store),
- node(Id, Pred, Successor, NStore);
- {request, Peer} ->
- request(Peer, Predecessor),
- node(Id, Predecessor, Successor, Store);
- {status, Pred} ->
- Succ = stabilize(Pred, Id, Successor),
- node(Id, Predecessor, Succ, Store);
- stabilize ->
- if
- (Predecessor /= nil) ->
- {Key,_} = Predecessor;
- true ->
- Key = nil
- end,
- ?DEBUG("Node(~w): Sending reqest to predecessor ~w~n",[Id, Key]),
- stabilize(Successor),
- node(Id, Predecessor, Successor, Store);
- %node(Id, Predecessor, Successor, Store);
- probe ->
- create_probe(Id, Successor),
- node(Id, Predecessor, Successor, Store);
- {probe, Id, Nodes, T} ->
- remove_probe(Id, T, Nodes),
- node(Id, Predecessor, Successor, Store);
- {probe, Ref, Nodes, T} ->
- forward_probe(Ref, Id, T, Nodes, Successor),
- node(Id, Predecessor, Successor, Store);
- storage_probe ->
- create_storage_probe(Id, Successor, Store),
- node(Id, Predecessor, Successor, Store);
- {storage_probe, Id, Nodes, T} ->
- remove_storage_probe(Id, T, Nodes),
- node(Id, Predecessor, Successor, Store);
- {storage_probe, Ref, Nodes, T} ->
- forward_storage_probe(Ref, Id, T, Nodes, Successor, Store),
- node(Id, Predecessor, Successor, Store);
- {add, Key, Value, Qref, Client} ->
- Added = add(Key, Value, Qref, Client,
- Id, Predecessor, Successor, Store),
- node(Id, Predecessor, Successor, Added);
- {lookup, Key, Qref, Client} ->
- lookup(Key, Qref, Client, Id, Predecessor, Successor, Store),
- node(Id, Predecessor, Successor, Store);
- {handover, Elements} ->
- ?H_DEBUG("Node({~w,~w): Received handover: ~w ~n", [Id, self(), Elements]),
- Merged = storage:merge(Store, Elements),
- node(Id, Predecessor, Successor, Merged);
- Error ->
- io:format("Node(~w) strange message: ~w~n", [Id, Error]),
- node(Id, Predecessor, Successor, Store)
- end.
- add(Key, Value, Qref, Client, Id, {Pkey, _}, {_, Spid}, Store) ->
- case key:between(Key, Pkey, Id) of
- true ->
- Client ! {Qref, ok},
- storage:add(Key, Value, Store);
- false ->
- %% Pass request to successor.
- Spid ! {add, Key, Value, Qref, Client},
- Store
- end.
- lookup(Key, Qref, Client, Id, {Pkey, _}, Successor, Store) ->
- case key:between(Key, Pkey, Id) of
- true ->
- Result = storage:lookup(Key, Store),
- Client ! {Qref, Result};
- false ->
- {_, Spid} = Successor,
- Spid ! {lookup, Key, Qref, Client}
- end.
- handover(Store, Nkey, Npid, Id) when Id > Nkey->
- ?H_DEBUG("Handover: Nkey: ~w, Npid: ~w~n", [Nkey, Npid]),
- {Leave, Keep} = storage:split(Nkey, Store),
- Npid ! {handover, Leave},
- Keep;
- handover(Store, Nkey, Npid, Id) when Id < Nkey->
- ?H_DEBUG("Handover: Nkey: ~w, Npid: ~w~n", [Nkey, Npid]),
- {Keep, Leave} = storage:split(Id, Store),
- Npid ! {handover, Leave},
- Keep.
- create_probe(Id, {_, Spid}) ->
- Spid ! {probe, Id, [{Id, self()}], now()}.
- remove_probe(Id, T, Nodes) ->
- Time = timer:now_diff(now(), T),
- io:format("Node(~w): time=~.2fms, nodes=~w~n", [Id, Time/1000, Nodes]).
- forward_probe(Ref, Id, T, Nodes, {_, Spid}) ->
- ?DEBUG("Node(~w): Forwardning probe. ~n", [Id]),
- Spid ! {probe, Ref, [{Id, self()} | Nodes], T}.
- create_storage_probe(Id, {_, Spid}, Store) ->
- Spid ! {storage_probe, Id, [{Id, Store}], now()}.
- remove_storage_probe(Id, T, Nodes) ->
- Time = timer:now_diff(now(), T),
- io:format("Node(~w): time=~.2fms, stores=~w~n", [Id, Time/1000, Nodes]).
- forward_storage_probe(Ref, Id, T, Nodes, {_, Spid}, Store) ->
- ?DEBUG("Node(~w): Forwardning probe. ~n", [Id]),
- Spid ! {storage_probe, Ref, [{Id, Store} | Nodes], T}.
- % Send status request to sucessor.
- % The status response then triggers stabilize in this node.
- stabilize({_, Spid}) ->
- Spid ! {request, self()}.
- stabilize(Pred, Id, Successor) ->
- {Skey, Spid} = Successor,
- case Pred of
- nil ->
- ?DEBUG("Node(~w): Our sucessors pred. pointer is nil, inform ~w.~n", [Id, Skey]),
- % Inform our successor of our existence
- Spid ! {notify, {Id, self()}},
- Successor;
- {Id, _} ->
- ?DEBUG("Node(~w): do nothing.~n", [Id]),
- % We're the predecessor of our successor, don't do annyting.
- Successor;
- {Skey, _} ->
- ?DEBUG("Node(~w): Succesor node ~w is the predecessor of itself.~n", [Id, Skey]),
- % Our succesors predecessor is itself, we should notify our successor of our existence.
- Spid ! {notify, {Id, self()}},
- Successor;
- {Xkey, Xpid} -> % Our successors predecessor is another node.
- case key:between(Xkey, Id, Skey) of
- true ->
- ?DEBUG("Node(~w): Adopting ~w as new successor.~n", [Id, Xkey]),
- % Adopt the node as out successor and run stabilization again.
- stabilize(Pred, Id, {Xkey, Xpid});
- false ->
- ?DEBUG("Node(~w): Adopting ~w as new successor and sending notification.~n", [Id, Xkey]),
- % Set the sucessors predecessor as our new successor and inform it of our existence.
- Xpid ! {notify, {Id, self()}},
- {Xkey, Xpid}
- end
- end.
- schedule_stabilize() ->
- timer:send_interval(?Stabilize, self(), stabilize).
- request(Peer, Predecessor) ->
- case Predecessor of
- nil ->
- Peer ! {status, nil};
- {Pkey, Ppid} ->
- Peer ! {status, {Pkey, Ppid}}
- end.
- %% We got a notification from someone that claims to be our proper predecessor.
- notify({Nkey, Npid}, Id, Predecessor, Store) ->
- case Predecessor of
- nil ->
- %% Our predecessor is nil.
- %% Add the node as our predecessor.
- ?DEBUG("Node(~w): Predeccessor is nil, adding ~w as predecessor.~n", [Id, Nkey]),
- Keep = handover(Store, Nkey, Npid, Id),
- {{Nkey, Npid}, Keep};
- {Pkey, _} ->
- case key:between(Nkey, Pkey, Id) of
- true ->
- %% Add the node as our predecessor
- ?DEBUG("Node(~w): Added node ~w as new predecessor.~n", [Id, Nkey]),
- Keep = handover(Store, Nkey, Npid, Id),
- {{Nkey, Npid}, Keep};
- false ->
- ?DEBUG("Node(~w): Notification from ~w denied.~n", [Id, Nkey]),
- %% Reqest denied, send stabilize to speed things up.
- Npid ! stabilize,
- {Predecessor, Store}
- end
- end.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement