Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(amqp_example).
- %% API
- -export([start/1,start/0,stop/0,init/2,create/0,sync/0,test/0]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,terminate/2, code_change/3]).
- -behaviour(gen_server).
- -include_lib("amqp_client/include/amqp_client.hrl").
- %%-compile([export_all]).
- -include("amqprec.hrl").
- -define(PARAM,[]).
- %%=========================================== API
- sync()->
- make:all([load]).
- % A sync/1 function can compile and reload all the modules pertaining to the project within the folder.
- start() ->
- case whereis(amqp_example) of
- undefined ->
- gen_server:start(?MODULE, {?PARAM}, []);
- Amqp_PId ->
- io:format("Amqp:~p is already running on this node.~n",[Amqp_PId])
- end.
- stop()->
- case whereis(amqp_example) of
- undefined ->
- io:format("Amqp cannot be stopped, it is not online~n");
- Amqp_PId ->
- gen_server:cast(Amqp_PId,{stop,normal})
- end.
- %The stop/0 function first checks whether a polis process is online. If there is an online polis process running on the node, then the stop function sends a signal to it requesting it to stop.
- start(Start_Parameters) ->
- gen_server:start(?MODULE, Start_Parameters, []).
- init(Pid,InitState)->
- gen_server:cast(Pid,{init,InitState}).
- %The start/0 first checks whether a polis process has already been spawned, by checking if one is registered. If it's not, then the start/1 function starts up the neuroevolutionary platform.
- init(Params) ->
- process_flag(trap_exit,true),
- register(amqp_example,self()),
- %% Start a network connection
- {ok, Connection} = amqp_connection:start(#amqp_params_network{virtual_host = <<"fx">>, username= <<"fx">>, password= <<"fx">> }),
- State = #amqpstate{connection = Connection, pid = self()},
- %{ok,Pid_Sink} = amqp_fx_mnesia_sink:start_link(),
- %, {amqp_direct_consumer, [self()]}
- %% Open a channel on the connection
- {ok, Channel} = amqp_connection:open_channel(Connection, {amqp_fx_mnesia_sink, [State]}),
- %% Declare a queue
- QDecl = #'queue.declare'{queue = <<"live.quote">>, durable=true, auto_delete=false, exclusive=false},
- #'queue.declare_ok'{queue = Q}
- = amqp_channel:call(Channel, QDecl),
- %%amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q,
- %% no_ack = true}, self()),
- #'basic.consume_ok'{consumer_tag = Tag} =
- amqp_channel:call(Channel, #'basic.consume'{queue = Q, no_ack = true}), %% the caller is the subscriber
- State2 = State#amqpstate{channel = Channel, tag = Tag},
- io:format(" [*] Initialized. ~n"),
- {ok,State2}.
- handle_call({stop,normal},_From, State)->
- {stop, normal, State};
- handle_call({stop,shutdown},_From,State)->
- {stop, shutdown, State}.
- %At this point polis only accepts a get_scape call, to which it replies with the Pid or undefined message, and the two standard {stop,normal} and {stop,shutdown} calls.
- handle_cast({init,InitState},_State)->
- {noreply,InitState};
- handle_cast({stop,normal},State)->
- {stop, normal,State};
- handle_cast({stop,shutdown},State)->
- {stop, shutdown, State}.
- %At this point polis allows only for 3 standard casts: {init,InitState}, {stop,normal} and {stop,shutdown}.
- handle_info(_Info, State) ->
- io:format("******** handle_info:~p~n",[_Info]),
- {noreply, State}.
- %The standard, still unused handle_info/2 function.
- terminate(Reason, S = #amqpstate{connection = Connection, channel = Channel}) ->
- % Tag = element(3, S),
- %amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}),
- %% Close the channel
- amqp_channel:close(Channel),
- %% Close the connection
- amqp_connection:close(Connection),
- io:format("******** Amqp is now offline, terminated with reason:~p~n",[Reason]),
- ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- create()->
- mnesia:create_table(marketdata1,[{disc_copies, [node()]},{type,ordered_set},{attributes, record_info(fields,marketdata1)}]).
- receive_not_empty(Channel, Get) ->
- {#'basic.get_ok'{delivery_tag = Tag}, #amqp_msg{payload = Body}}
- = amqp_channel:call(Channel, Get),
- %% Get the message back from the queue
- case Body of
- <<>> ->
- %% Ack the message
- amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
- receive_not_empty(Channel, Get);
- _ -> {Tag,Body}
- end.
- convert(Element, Index) ->
- case lists:member(Index,[9,11,14,18,21]) of
- true ->
- list_to_integer(binary_to_list(Element));
- false ->
- case lists:member(Index,[3,4,5,6,7,8,10,13,16,17,20,23]) of
- true -> list_to_float(binary_to_list(Element));
- false -> Element
- end
- end.
- set_record(Mkd_Record, [], Index) ->
- Mkd_Record;
- set_record(Mkd_Record, [ListElement|Rest], Index) ->
- NewValue = convert(ListElement, Index),
- T = setelement(Index+2, Mkd_Record, NewValue),
- %%io:format("TradeSignal:~p~n", [T]),
- set_record(T, Rest, Index+1).
- test() ->
- %% Start a network connection
- {ok, Connection1} = amqp_connection:start(#amqp_params_network{virtual_host = <<"fx">>, username= <<"fx">>, password= <<"fx">> }),
- %% Open a channel on the connection
- {ok, Channel1} = amqp_connection:open_channel(Connection1),
- %% Declare a queue
- QDecl = #'queue.declare'{queue = <<"live.quote2">>, durable=true, auto_delete=false, exclusive=false},
- #'queue.declare_ok'{queue = Q}
- = amqp_channel:call(Channel1, QDecl),
- %% Declare a queue
- QDecl2 = #'queue.declare'{queue = <<"test.quote">>, durable=true, auto_delete=false, exclusive=false},
- #'queue.declare_ok'{queue = Q2}
- = amqp_channel:call(Channel1, QDecl2),
- Declare = #'exchange.declare'{exchange = <<"test.feed">>, type= <<"fanout">>, durable=true },
- #'exchange.declare_ok'{} = amqp_channel:call(Channel1, Declare),
- Binding = #'queue.bind'{queue = Q2,
- exchange = <<"test.feed">>},
- #'queue.bind_ok'{} = amqp_channel:call(Channel1, Binding),
- Get = #'basic.get'{queue = Q},
- {T , Pl } = receive_not_empty(Channel1, Get),
- %%[First | M1data] = binary:split(Pl, <<";">>, [global]),
- %%Rec = set_record(#marketdata1{id = now()},M1data, 1),
- %%mnesia:dirty_write(Rec),
- %%Pl = Content#amqp_msg.payload,
- %% Do something with the message payload
- %% (some work here)
- io:format("TradeSignal:~s~n", [Pl]),
- amqp_channel:cast(Channel1, #'basic.ack'{delivery_tag = T}),
- %% Publish a message
- Publish = #'basic.publish'{exchange = <<"test.feed">>},
- amqp_channel:cast(Channel1, Publish, #amqp_msg{payload = Pl}),
- %% Close the channel
- amqp_channel:close(Channel1),
- %% Close the connection
- amqp_connection:close(Connection1).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement