Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(consumer_agent).
- -behaviour(gen_fsm).
- %% public API
- -export([start_link/1, negotiation/4]).
- %% gen_fsm callbacks
- -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
- terminate/3, code_change/4,
- % custom state names
- idle/3, idle_wait/2, negotiate/2,
- wait/2, ready/2, ready/3]).
- -record(state, {name="",
- other,
- item,
- price,
- initprice,
- maxprice,
- monitor,
- from}).
- %%% PUBLIC API
- start_link(Name) ->
- gen_fsm:start_link(?MODULE, [Name], []).
- negotiation(OwnPid, OtherPid, Item, MaxPrice) ->
- gen_fsm:sync_send_event(OwnPid, {negotiate, Item, OtherPid, MaxPrice}, 30000).
- %%% CLIENT-TO-CLIENT API
- ask_negotiate(OtherPid, Item, OwnPid) ->
- gen_fsm:send_event(OtherPid, {ask_negotiate, Item, OwnPid}).
- do_offer(OtherPid, Price) ->
- gen_fsm:send_event(OtherPid, {do_offer, Price}).
- are_you_ready(OtherPid) ->
- gen_fsm:send_event(OtherPid, are_you_ready).
- not_yet(OtherPid) ->
- gen_fsm:send_event(OtherPid, not_yet).
- am_ready(OtherPid) ->
- gen_fsm:send_event(OtherPid, 'ready!').
- ack_trans(OtherPid) ->
- gen_fsm:send_event(OtherPid, ack).
- ask_commit(OtherPid) ->
- gen_fsm:sync_send_event(OtherPid, ask_commit).
- do_commit(OtherPid) ->
- gen_fsm:sync_send_event(OtherPid, do_commit).
- notify_cancel(OtherPid) ->
- gen_fsm:send_all_state_event(OtherPid, cancel).
- %%% GEN_FSM API
- init(Name) ->
- {ok, idle, #state{name=Name}}.
- idle({negotiate, Item, OtherPid, MaxPrice}, From, S=#state{}) ->
- ask_negotiate(OtherPid, Item, self()),
- notice(S, "asking producer ~p for a sale of item ~s", [OtherPid, Item]),
- Ref = monitor(process, OtherPid),
- {next_state, idle_wait, S#state{other=OtherPid,
- monitor=Ref,
- from=From,
- item=Item,
- maxprice=MaxPrice
- }};
- idle(Event, _From, Data) ->
- unexpected(Event, idle),
- {next_state, idle, Data}.
- idle_wait({accept_negotiate, InitPrice, OtherPid}, S=#state{other=OtherPid}) ->
- gen_fsm:reply(S#state.from, ok),
- notice(S, "starting negotiation, init price ~p", [InitPrice]),
- NewPrice = S#state.maxprice * 0.75,
- do_offer(S#state.other, NewPrice),
- {next_state, negotiate, S#state{initprice=InitPrice, price=NewPrice}};
- idle_wait(Event, Data) ->
- unexpected(Event, idle_wait),
- {next_state, idle_wait, Data}.
- negotiate({do_offer, Price}, S=#state{}) ->
- notice(S, "Producer offering price ~p", [Price]),
- NextPrice = calculate_next_price(S#state.price, Price, S#state.maxprice, S#state.initprice),
- case NextPrice of
- ok ->
- are_you_ready(S#state.other),
- {next_state, wait, S#state{price=Price}};
- stop ->
- notify_cancel(S#state.other),
- {next_state, cancel, S};
- _ ->
- do_offer(S#state.other, NextPrice),
- {next_state, negotiate, S#state{price=NextPrice}}
- end;
- negotiate(are_you_ready, S=#state{}) ->
- io:format("Producer ready to trade.~n"),
- notice(S, "Producer ready to sell item ~p for ~p, initial price was ~p.",
- [S#state.item, S#state.price, S#state.initprice]),
- are_you_ready(S#state.other),
- {next_state, wait, S};
- negotiate(Event, Data) ->
- unexpected(Event, negotiate),
- {next_state, negotiate, Data}.
- wait({do_offer, Price}, S=#state{}) ->
- gen_fsm:reply(S#state.from, offer_changed),
- notice(S, "Producer offering price ~p", [Price]),
- {next_state, negotiate, S#state{price=Price}};
- wait(are_you_ready, S=#state{}) ->
- am_ready(S#state.other),
- notice(S, "asked if ready, and I am. Waiting for same reply", []),
- {next_state, wait, S};
- wait(not_yet, S = #state{}) ->
- notice(S, "Producer not ready yet", []),
- {next_state, wait, S};
- wait('ready!', S=#state{}) ->
- am_ready(S#state.other),
- ack_trans(S#state.other),
- gen_fsm:reply(S#state.from, ok),
- notice(S, "Producer is ready. Moving to ready state", []),
- {next_state, ready, S};
- wait(Event, Data) ->
- unexpected(Event, wait),
- {next_state, wait, Data}.
- ready(ack, S=#state{}) ->
- case priority(self(), S#state.other) of
- true ->
- try
- notice(S, "asking for commit", []),
- ready_commit = ask_commit(S#state.other),
- notice(S, "ordering commit", []),
- ok = do_commit(S#state.other),
- notice(S, "committing...", []),
- commit(S),
- {stop, normal, S}
- catch Class:Reason ->
- notice(S, "commit failed", []),
- {stop, {Class, Reason}, S}
- end;
- false ->
- {next_state, ready, S}
- end;
- %% BUG
- ready('ready!', S) ->
- {next_state, ready, S};
- ready(Event, Data) ->
- unexpected(Event, ready),
- {next_state, ready, Data}.
- ready(ask_commit, _From, S) ->
- notice(S, "replying to ask_commit", []),
- {reply, ready_commit, ready, S};
- ready(do_commit, _From, S) ->
- notice(S, "committing...", []),
- commit(S),
- {stop, normal, ok, S};
- ready(Event, _From, Data) ->
- unexpected(Event, ready),
- {next_state, ready, Data}.
- handle_event(cancel, _StateName, S=#state{}) ->
- notice(S, "received cancel event", []),
- {stop, other_cancelled, S};
- handle_event(Event, StateName, Data) ->
- unexpected(Event, StateName),
- {next_state, StateName, Data}.
- handle_sync_event(cancel, _From, _StateName, S = #state{}) ->
- notify_cancel(S#state.other),
- notice(S, "cancelling trade, sending cancel event", []),
- {stop, cancelled, ok, S};
- handle_sync_event(Event, _From, StateName, Data) ->
- unexpected(Event, StateName),
- {next_state, StateName, Data}.
- handle_info({'DOWN', Ref, process, Pid, Reason}, _, S=#state{other=Pid, monitor=Ref}) ->
- notice(S, "Producer is dead", []),
- {stop, {other_down, Reason}, S};
- handle_info(Info, StateName, Data) ->
- unexpected(Info, StateName),
- {next_state, StateName, Data}.
- code_change(_OldVsn, StateName, Data, _Extra) ->
- {ok, StateName, Data}.
- terminate(normal, ready, S=#state{}) ->
- notice(S, "FSM leaving.", []);
- terminate(_Reason, _StateName, _StateData) ->
- ok.
- %%% PRIVATE FUNCTIONS
- notice(#state{name=N}, Str, Args) ->
- io:format("~s: "++Str++"~n", [N|Args]).
- unexpected(Msg, State) ->
- io:format("~p received unknown event ~p while in state ~p~n",
- [self(), Msg, State]).
- priority(OwnPid, OtherPid) when OwnPid > OtherPid -> true;
- priority(OwnPid, OtherPid) when OwnPid < OtherPid -> false.
- commit(S = #state{}) ->
- io:format("Transaction completed for ~p.~n"
- "Item ~s was bought for ~p, initial price was ~p~n",
- [S#state.name, S#state.item, S#state.price, S#state.initprice]).
- calculate_next_price(PrevPrice, Price, MaxPrice, InitPrice) ->
- case (Price > MaxPrice) or (Price > InitPrice) of
- true -> stop;
- _ -> case abs(PrevPrice - Price) < 0.1 * Price of
- true -> ok;
- _ -> 1.1*PrevPrice
- end
- end.
- -module(producer_agent).
- -behaviour(gen_fsm).
- %%% PUBLIC API
- -export([start_link/1, accept_negotiation/3]).
- %% gen_fsm callbacks
- -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
- terminate/3, code_change/4,
- % custom state names
- idle/2, idle_wait/3, negotiate/2,
- wait/2, ready/2, ready/3]).
- -record(state, {name="",
- other,
- item,
- price,
- initprice,
- minprice,
- monitor,
- from}).
- %%% PUBLIC API
- start_link(Name) ->
- gen_fsm:start_link(?MODULE, [Name], []).
- accept_negotiation(OwnPid, InitPrice, MinPrice) ->
- gen_fsm:sync_send_event(OwnPid, {accept_negotiate, InitPrice, MinPrice}).
- %%% CLIENT-TO-CLIENT API
- accept_negotiate(OtherPid, InitPrice, OwnPid) ->
- gen_fsm:send_event(OtherPid, {accept_negotiate, InitPrice, OwnPid}).
- do_offer(OtherPid, Price) ->
- gen_fsm:send_event(OtherPid, {do_offer, Price}).
- are_you_ready(OtherPid) ->
- gen_fsm:send_event(OtherPid, are_you_ready).
- not_yet(OtherPid) ->
- gen_fsm:send_event(OtherPid, not_yet).
- am_ready(OtherPid) ->
- gen_fsm:send_event(OtherPid, 'ready!').
- ack_trans(OtherPid) ->
- gen_fsm:send_event(OtherPid, ack).
- ask_commit(OtherPid) ->
- gen_fsm:sync_send_event(OtherPid, ask_commit).
- do_commit(OtherPid) ->
- gen_fsm:sync_send_event(OtherPid, do_commit).
- notify_cancel(OtherPid) ->
- gen_fsm:send_all_state_event(OtherPid, cancel).
- %%% GEN_FSM API
- init(Name) ->
- {ok, idle, #state{name=Name}}.
- idle({ask_negotiate, Item, OtherPid}, S=#state{}) ->
- Ref = monitor(process, OtherPid),
- notice(S, "~p asked for a sale negotiation of item ~s", [OtherPid, Item]),
- {next_state, idle_wait, S#state{other=OtherPid, monitor=Ref, item=Item}};
- idle(Event, Data) ->
- unexpected(Event, idle),
- {next_state, idle, Data}.
- idle_wait({accept_negotiate, InitPrice, MinPrice}, From, S=#state{other=OtherPid}) ->
- accept_negotiate(OtherPid, InitPrice, self()),
- notice(S, "accepting negotiation with init price ~p", [InitPrice]),
- {reply, ok, negotiate, S#state{initprice=InitPrice, minprice=MinPrice, from=From,
- price=InitPrice}};
- idle_wait(Event, _From, Data) ->
- unexpected(Event, idle_wait),
- {next_state, idle_wait, Data}.
- negotiate({do_offer, Price}, S=#state{}) ->
- notice(S, "Consumer offering price ~p", [Price]),
- NextPrice = calculate_next_price(Price, S#state.price, S#state.minprice),
- case NextPrice of
- ok ->
- are_you_ready(S#state.other),
- {next_state, wait, S#state{price=Price}};
- stop ->
- notify_cancel(S#state.other),
- {next_state, cancel, S};
- _ ->
- do_offer(S#state.other, NextPrice),
- {next_state, negotiate, S#state{price=NextPrice}}
- end;
- negotiate(are_you_ready, S=#state{}) ->
- notice(S, "Consumer ready to buy item ~p for ~p, initial price was ~p.",
- [S#state.item, S#state.price, S#state.initprice]),
- are_you_ready(S#state.other),
- {next_state, wait, S};
- negotiate(Event, Data) ->
- unexpected(Event, negotiate),
- {next_state, negotiate, Data}.
- wait({do_offer, Price}, S=#state{}) ->
- gen_fsm:reply(S#state.from, offer_changed),
- notice(S, "Consumer offering price ~p", [Price]),
- {next_state, negotiate, S#state{price=Price}};
- wait(are_you_ready, S=#state{}) ->
- am_ready(S#state.other),
- notice(S, "asked if ready, and I am. Waiting for same reply", []),
- {next_state, wait, S};
- wait(not_yet, S = #state{}) ->
- notice(S, "Consumer not ready yet", []),
- {next_state, wait, S};
- wait('ready!', S=#state{}) ->
- am_ready(S#state.other),
- ack_trans(S#state.other),
- gen_fsm:reply(S#state.from, ok),
- notice(S, "Consumer is ready. Moving to ready state", []),
- {next_state, ready, S};
- wait(Event, Data) ->
- unexpected(Event, wait),
- {next_state, wait, Data}.
- ready(ack, S=#state{}) ->
- case priority(self(), S#state.other) of
- true ->
- try
- notice(S, "asking for commit", []),
- ready_commit = ask_commit(S#state.other),
- notice(S, "ordering commit", []),
- ok = do_commit(S#state.other),
- notice(S, "committing...", []),
- commit(S),
- {stop, normal, S}
- catch Class:Reason ->
- notice(S, "commit failed", []),
- {stop, {Class, Reason}, S}
- end;
- false ->
- {next_state, ready, S}
- end;
- %% BUG
- ready('ready!', S) ->
- {next_state, ready, S};
- ready(Event, Data) ->
- unexpected(Event, ready),
- {next_state, ready, Data}.
- ready(ask_commit, _From, S) ->
- notice(S, "replying to ask_commit", []),
- {reply, ready_commit, ready, S};
- ready(do_commit, _From, S) ->
- notice(S, "committing...", []),
- commit(S),
- {stop, normal, ok, S};
- ready(Event, _From, Data) ->
- unexpected(Event, ready),
- {next_state, ready, Data}.
- handle_event(cancel, _StateName, S=#state{}) ->
- notice(S, "received cancel event", []),
- {stop, other_cancelled, S};
- handle_event(Event, StateName, Data) ->
- unexpected(Event, StateName),
- {next_state, StateName, Data}.
- handle_sync_event(cancel, _From, _StateName, S = #state{}) ->
- notify_cancel(S#state.other),
- notice(S, "cancelling trade, sending cancel event", []),
- {stop, cancelled, ok, S};
- handle_sync_event(Event, _From, StateName, Data) ->
- unexpected(Event, StateName),
- {next_state, StateName, Data}.
- handle_info({'DOWN', Ref, process, Pid, Reason}, _, S=#state{other=Pid, monitor=Ref}) ->
- notice(S, "Consumer is dead", []),
- {stop, {other_down, Reason}, S};
- handle_info(Info, StateName, Data) ->
- unexpected(Info, StateName),
- {next_state, StateName, Data}.
- code_change(_OldVsn, StateName, Data, _Extra) ->
- {ok, StateName, Data}.
- terminate(normal, ready, S=#state{}) ->
- notice(S, "FSM leaving.", []);
- terminate(_Reason, _StateName, _StateData) ->
- ok.
- %%% PRIVATE FUNCTIONS
- %% BUG
- notice(#state{name=N}, Str, Args) ->
- io:format("~s: "++Str++"~n", [N|Args]).
- unexpected(Msg, State) ->
- io:format("~p received unknown event ~p while in state ~p~n",
- [self(), Msg, State]).
- priority(OwnPid, OtherPid) when OwnPid > OtherPid -> true;
- priority(OwnPid, OtherPid) when OwnPid < OtherPid -> false.
- commit(S = #state{}) ->
- io:format("Transaction completed for ~p.~n"
- "Item ~s was sold for ~p, initial price was ~p~n",
- [S#state.name, S#state.item, S#state.price, S#state.initprice]).
- calculate_next_price(Price, PrevPrice, MinPrice) ->
- case Price < MinPrice of
- true -> stop;
- _ -> case abs(PrevPrice - Price) < 0.1 * Price of
- true -> ok;
- _ -> 0.9 * PrevPrice
- end
- end.
- -module(invokes).
- -compile(export_all).
- start() ->
- S = self(),
- spawn(fun() -> producer(S) end),
- receive PidProducer -> PidProducer end,
- spawn(fun() -> consumer(PidProducer) end),
- timer:sleep(8000),
- ok.
- producer(Parent) ->
- {ok, Pid} = producer_agent:start_link("Producer"),
- Parent ! Pid,
- io:format("Spawned producer: ~p~n", [Pid]),
- timer:sleep(1000),
- producer_agent:accept_negotiation(Pid, 1600, 960),
- timer:sleep(1000).
- consumer(Producer) ->
- {ok, Pid} = consumer_agent:start_link("Consumer"),
- io:format("Spawned consumer: ~p~n", [Pid]),
- timer:sleep(100),
- consumer_agent:negotiation(Pid, Producer, "Shoes", 1440),
- timer:sleep(1000).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement