Advertisement
Guest User

Untitled

a guest
Feb 15th, 2016
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.09 KB | None | 0 0
  1. -module(amqp_example).
  2. %% API
  3. -export([start/1,start/0,stop/0,init/2,create/0,sync/0,test/0]).
  4. %% gen_server callbacks
  5. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,terminate/2, code_change/3]).
  6.  
  7. -behaviour(gen_server).
  8. -include_lib("amqp_client/include/amqp_client.hrl").
  9. %%-compile([export_all]).
  10.  
  11. -include("amqprec.hrl").
  12.  
  13. -define(PARAM,[]).
  14. %%=========================================== API
  15. sync()->
  16. make:all([load]).
  17. % A sync/1 function can compile and reload all the modules pertaining to the project within the folder.
  18.  
  19.  
  20. start() ->
  21. case whereis(amqp_example) of
  22. undefined ->
  23. gen_server:start(?MODULE, {?PARAM}, []);
  24. Amqp_PId ->
  25. io:format("Amqp:~p is already running on this node.~n",[Amqp_PId])
  26. end.
  27.  
  28. stop()->
  29. case whereis(amqp_example) of
  30. undefined ->
  31. io:format("Amqp cannot be stopped, it is not online~n");
  32. Amqp_PId ->
  33. gen_server:cast(Amqp_PId,{stop,normal})
  34. end.
  35. %The stop/0 function first checks whether a polis process is online. If there is an online polis process running on the node, then the stop function sends a signal to it requesting it to stop.
  36.  
  37. start(Start_Parameters) ->
  38. gen_server:start(?MODULE, Start_Parameters, []).
  39. init(Pid,InitState)->
  40. gen_server:cast(Pid,{init,InitState}).
  41. %The start/0 first checks whether a polis process has already been spawned, by checking if one is registered. If it's not, then the start/1 function starts up the neuroevolutionary platform.
  42.  
  43. init(Params) ->
  44. process_flag(trap_exit,true),
  45. register(amqp_example,self()),
  46. %% Start a network connection
  47. {ok, Connection} = amqp_connection:start(#amqp_params_network{virtual_host = <<"fx">>, username= <<"fx">>, password= <<"fx">> }),
  48. State = #amqpstate{connection = Connection, pid = self()},
  49. %{ok,Pid_Sink} = amqp_fx_mnesia_sink:start_link(),
  50. %, {amqp_direct_consumer, [self()]}
  51. %% Open a channel on the connection
  52. {ok, Channel} = amqp_connection:open_channel(Connection, {amqp_fx_mnesia_sink, [State]}),
  53.  
  54. %% Declare a queue
  55. QDecl = #'queue.declare'{queue = <<"live.quote">>, durable=true, auto_delete=false, exclusive=false},
  56. #'queue.declare_ok'{queue = Q}
  57. = amqp_channel:call(Channel, QDecl),
  58.  
  59. %%amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q,
  60. %% no_ack = true}, self()),
  61.  
  62. #'basic.consume_ok'{consumer_tag = Tag} =
  63. amqp_channel:call(Channel, #'basic.consume'{queue = Q, no_ack = true}), %% the caller is the subscriber
  64. State2 = State#amqpstate{channel = Channel, tag = Tag},
  65. io:format(" [*] Initialized. ~n"),
  66. {ok,State2}.
  67.  
  68. handle_call({stop,normal},_From, State)->
  69. {stop, normal, State};
  70. handle_call({stop,shutdown},_From,State)->
  71. {stop, shutdown, State}.
  72. %At this point polis only accepts a get_scape call, to which it replies with the Pid or undefined message, and the two standard {stop,normal} and {stop,shutdown} calls.
  73.  
  74. handle_cast({init,InitState},_State)->
  75. {noreply,InitState};
  76. handle_cast({stop,normal},State)->
  77. {stop, normal,State};
  78. handle_cast({stop,shutdown},State)->
  79. {stop, shutdown, State}.
  80. %At this point polis allows only for 3 standard casts: {init,InitState}, {stop,normal} and {stop,shutdown}.
  81.  
  82.  
  83. handle_info(_Info, State) ->
  84.  
  85. io:format("******** handle_info:~p~n",[_Info]),
  86.  
  87. {noreply, State}.
  88. %The standard, still unused handle_info/2 function.
  89.  
  90. terminate(Reason, S = #amqpstate{connection = Connection, channel = Channel}) ->
  91. % Tag = element(3, S),
  92.  
  93. %amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}),
  94.  
  95. %% Close the channel
  96. amqp_channel:close(Channel),
  97. %% Close the connection
  98. amqp_connection:close(Connection),
  99.  
  100. io:format("******** Amqp is now offline, terminated with reason:~p~n",[Reason]),
  101. ok.
  102.  
  103. code_change(_OldVsn, State, _Extra) ->
  104. {ok, State}.
  105.  
  106.  
  107. create()->
  108. mnesia:create_table(marketdata1,[{disc_copies, [node()]},{type,ordered_set},{attributes, record_info(fields,marketdata1)}]).
  109.  
  110. receive_not_empty(Channel, Get) ->
  111. {#'basic.get_ok'{delivery_tag = Tag}, #amqp_msg{payload = Body}}
  112. = amqp_channel:call(Channel, Get),
  113. %% Get the message back from the queue
  114. case Body of
  115. <<>> ->
  116. %% Ack the message
  117. amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
  118. receive_not_empty(Channel, Get);
  119. _ -> {Tag,Body}
  120. end.
  121.  
  122. convert(Element, Index) ->
  123. case lists:member(Index,[9,11,14,18,21]) of
  124. true ->
  125. list_to_integer(binary_to_list(Element));
  126. false ->
  127. case lists:member(Index,[3,4,5,6,7,8,10,13,16,17,20,23]) of
  128. true -> list_to_float(binary_to_list(Element));
  129. false -> Element
  130. end
  131. end.
  132.  
  133. set_record(Mkd_Record, [], Index) ->
  134. Mkd_Record;
  135.  
  136. set_record(Mkd_Record, [ListElement|Rest], Index) ->
  137. NewValue = convert(ListElement, Index),
  138. T = setelement(Index+2, Mkd_Record, NewValue),
  139. %%io:format("TradeSignal:~p~n", [T]),
  140. set_record(T, Rest, Index+1).
  141.  
  142. test() ->
  143. %% Start a network connection
  144. {ok, Connection1} = amqp_connection:start(#amqp_params_network{virtual_host = <<"fx">>, username= <<"fx">>, password= <<"fx">> }),
  145. %% Open a channel on the connection
  146. {ok, Channel1} = amqp_connection:open_channel(Connection1),
  147.  
  148. %% Declare a queue
  149. QDecl = #'queue.declare'{queue = <<"live.quote2">>, durable=true, auto_delete=false, exclusive=false},
  150. #'queue.declare_ok'{queue = Q}
  151. = amqp_channel:call(Channel1, QDecl),
  152.  
  153. %% Declare a queue
  154. QDecl2 = #'queue.declare'{queue = <<"test.quote">>, durable=true, auto_delete=false, exclusive=false},
  155. #'queue.declare_ok'{queue = Q2}
  156. = amqp_channel:call(Channel1, QDecl2),
  157.  
  158. Declare = #'exchange.declare'{exchange = <<"test.feed">>, type= <<"fanout">>, durable=true },
  159. #'exchange.declare_ok'{} = amqp_channel:call(Channel1, Declare),
  160.  
  161.  
  162. Binding = #'queue.bind'{queue = Q2,
  163. exchange = <<"test.feed">>},
  164. #'queue.bind_ok'{} = amqp_channel:call(Channel1, Binding),
  165.  
  166.  
  167. Get = #'basic.get'{queue = Q},
  168.  
  169. {T , Pl } = receive_not_empty(Channel1, Get),
  170. %%[First | M1data] = binary:split(Pl, <<";">>, [global]),
  171.  
  172. %%Rec = set_record(#marketdata1{id = now()},M1data, 1),
  173. %%mnesia:dirty_write(Rec),
  174.  
  175. %%Pl = Content#amqp_msg.payload,
  176. %% Do something with the message payload
  177. %% (some work here)
  178. io:format("TradeSignal:~s~n", [Pl]),
  179.  
  180. amqp_channel:cast(Channel1, #'basic.ack'{delivery_tag = T}),
  181. %% Publish a message
  182. Publish = #'basic.publish'{exchange = <<"test.feed">>},
  183. amqp_channel:cast(Channel1, Publish, #amqp_msg{payload = Pl}),
  184.  
  185.  
  186. %% Close the channel
  187. amqp_channel:close(Channel1),
  188. %% Close the connection
  189. amqp_connection:close(Connection1).
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement