Guest User

Untitled

a guest
Nov 8th, 2017
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.17 KB | None | 0 0
  1. -module (module_name).
  2. -behaviour(gen_server).
  3.  
  4. -include_lib("amqp_client/include/amqp_client.hrl").
  5.  
  6. -export([start_link/0]).
  7. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  8.  
  9. %% Internal state
  10. -record(state, {
  11. connection :: pid(), % rabbitMQ connection
  12. connection_ref :: reference(), % connection monitor ref
  13. channel :: pid(), % rabbitMQ channel
  14. channel_ref :: reference(), % channel monitor ref
  15. % ---
  16. rabbitmq_restart_timeout = 5000 :: pos_integer(), % restart timeout
  17. }).
  18.  
  19. %%==========
  20. start_link() ->
  21. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  22.  
  23. init(_Args) ->
  24. gen_server:cast(self(), connect),
  25. {ok, #state{rabbitmq_restart_timeout = 5000}}.
  26.  
  27. %%
  28. %% @doc Handling all messages from RabbitMQ
  29. %% @end
  30. handle_info({#'basic.deliver'{delivery_tag = Tag, routing_key = _Queue}, #amqp_msg{props = #'P_basic'{reply_to = ReplyTo}, payload = Body}} = _Msg, #state{channel = Channel} = State) ->
  31. amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
  32. try
  33. Message = binary_to_term(Body)
  34. %
  35. % Message is your payload
  36. %
  37. catch
  38. _:_ ->
  39. error_logger:error_report("Cannot parse message")
  40. end,
  41. {noreply, State};
  42.  
  43. handle_info({'DOWN', ConnectionRef, process, Connection, Reason}, #state{connection = Connection, connection_ref = ConnectionRef} = State) ->
  44. error_logger:error_report("AMQP connection error"),
  45. restart_me(State);
  46.  
  47. handle_info({'DOWN', ChannelRef, process, Channel, Reason}, #state{channel = Channel, channel_ref = ChannelRef} = State) ->
  48. error_logger:error_report("AMQP channel error"),
  49. restart_me(State);
  50.  
  51. handle_info(_Info, State) ->
  52. error_logger:error_report("Unsupported info message"),
  53. {noreply, State}.
  54.  
  55. handle_cast(connect, State) ->
  56. % connection parameters
  57. AMQP_Param = #amqp_params_network{
  58. host = "localhost",
  59. username = <<"username">>,
  60. password = <<"password">>,
  61. port = 5672,
  62. virtual_host = <<"vhost">>,
  63. heartbeat = 5 %% --- important to keep your connection alive
  64. },
  65. % connection...
  66. case amqp_connection:start(AMQP_Param) of
  67. {ok, Connection} ->
  68. % start connection monitor
  69. ConnectionRef = erlang:monitor(process, Connection),
  70. case amqp_connection:open_channel(Connection) of
  71. {ok, Channel} ->
  72. % add monitor to catch message when connection is 'DOWN'
  73. ChannelRef = erlang:monitor(process, Channel),
  74. %
  75. %
  76. % Here you have to subscribe to queues you want to listen
  77. %
  78. %
  79. {noreply, State#state{
  80. connection = Connection,
  81. connection_ref = ConnectionRef,
  82. channel = Channel,
  83. channel_ref = ChannelRef
  84. }};
  85. _Reason2 ->
  86. error_logger:error_report("AMQP channel error"),
  87. restart_me(State)
  88. end;
  89. _Reason1 ->
  90. error_logger:error_report("AMQP connection error"),
  91. restart_me(State)
  92. end;
  93.  
  94. handle_cast(_Msg, State) ->
  95. error_logger:error_report("Unsupported cast message"),
  96. {noreply, State}.
  97.  
  98. handle_call(_Request, _From, State) ->
  99. error_logger:error_report("Unsupported call message"),
  100. {reply, ok, State}.
  101.  
  102. terminate(_Reason, #state{connection = Connection, channel = Channel} = _State) ->
  103. if
  104. is_pid(Channel) -> amqp_channel:close(Channel);
  105. true -> pass
  106. end,
  107. if
  108. is_pid(Connection) -> amqp_connection:close(Connection);
  109. true -> pass
  110. end,
  111. ok.
  112.  
  113. code_change(_OldVsn, State, _Extra) ->
  114. {ok, State}.
  115.  
  116. %%
  117. %% This function is called when client lost connection to RabbitMQ
  118. restart_me(#state{rabbitmq_restart_timeout = Wait} = State) ->
  119. timer:sleep(Wait), % Sleep for rabbitmq_restart_timeout seconds
  120. {stop, error, State}.
Add Comment
Please, Sign In to add comment