Guest User

Untitled

a guest
Jan 23rd, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.35 KB | None | 0 0
  1. defmodule Rabbit do
  2. use Connection
  3. import AMQP.Core
  4. require Logger
  5.  
  6. @exchange_name "exchange_name"
  7. @content_type "application/msgpack"
  8.  
  9. def publish(routing_key, payload) do
  10. Connection.cast(:rabbit, {:publish, routing_key, payload})
  11. end
  12.  
  13. def start_link(connection_options \\ [], options \\ []) do
  14. Connection.start_link(__MODULE__, connection_options, options)
  15. end
  16.  
  17. def init(opts) do
  18. Process.flag(:trap_exit, true)
  19. state = %{conn: nil, opts: opts, channel: nil, queue: :queue.new()}
  20. {:connect, :init, state}
  21. end
  22.  
  23. def close(conn), do: Connection.call(conn, :close)
  24. def stop(conn), do: GenServer.stop(conn)
  25.  
  26. def connect(_, state) do
  27. with {:ok, conn} <- open(state.opts),
  28. true <- Process.link(conn),
  29. {:ok, channel} <- :amqp_connection.open_channel(conn),
  30. true <- Process.link(channel),
  31. :ok <- declare_direct_exchange(channel),
  32. :ok <- dequeue(state.queue, channel) do
  33. Logger.info(fn -> "Connected to RabbitMQ" end)
  34. {:ok, %{state | conn: conn, channel: channel, queue: :queue.new()}}
  35. else
  36. {:error_dequeue, reason, queue} ->
  37. Logger.error("Dequeue error: #{inspect(reason)}")
  38. network_recovery_interval = Keyword.get(state.opts, :network_recovery_interval)
  39. {:backoff, network_recovery_interval, %{state | queue: queue}}
  40.  
  41. {:error, reason} ->
  42. Logger.error("Connection error: #{inspect(reason)}")
  43. state = rotate_hosts(state)
  44. Logger.info(fn -> "Rotated hosts: #{inspect(state.opts)}" end)
  45. network_recovery_interval = Keyword.get(state.opts, :network_recovery_interval)
  46. {:backoff, network_recovery_interval, state}
  47. end
  48. end
  49.  
  50. defp dequeue(queue, channel) do
  51. case :queue.out(queue) do
  52. {{:value, {routing_key, payload}}, new_queue} ->
  53. case publish_message(channel, routing_key, payload) do
  54. :ok -> dequeue(new_queue, channel)
  55. {:error, error} -> {:error_dequeue, error, queue}
  56. end
  57.  
  58. {:empty, _} ->
  59. :ok
  60. end
  61. end
  62.  
  63. defp declare_direct_exchange(channel) do
  64. exchange_declare =
  65. exchange_declare(
  66. exchange: @exchange_name,
  67. type: "direct",
  68. passive: false,
  69. durable: true,
  70. auto_delete: false,
  71. internal: false,
  72. nowait: false,
  73. arguments: []
  74. )
  75.  
  76. exchange_declare_ok() = :amqp_channel.call(channel, exchange_declare)
  77. :ok
  78. end
  79.  
  80. def disconnect(info, state) do
  81. Logger.warn(fn -> "Disconnected: #{inspect(info)}" end)
  82. close_rabbit_channel_and_connection(state)
  83. {:connect, :reconnect, %{state | conn: nil, channel: nil}}
  84. end
  85.  
  86. defp rotate_hosts(state) do
  87. [host | hosts] = Keyword.get(state.opts, :hosts)
  88. opts = Keyword.put(state.opts, :hosts, hosts ++ [host])
  89. %{state | opts: opts}
  90. end
  91.  
  92. defp publish_message(channel, routing_key, payload) do
  93. basic_publish =
  94. basic_publish(
  95. exchange: @exchange_name,
  96. routing_key: routing_key
  97. )
  98.  
  99. p_basic =
  100. p_basic(
  101. content_type: @content_type,
  102. delivery_mode: 2 # persistent
  103. )
  104.  
  105. case :amqp_channel.call(channel, basic_publish, amqp_msg(props: p_basic, payload: payload)) do
  106. :ok -> :ok
  107. error -> {:error, error}
  108. end
  109. end
  110.  
  111. def handle_cast({:publish, routing_key, payload}, %{conn: nil, queue: queue} = state) do
  112. new_queue = :queue.in({routing_key, payload}, queue)
  113.  
  114. {:noreply, %{state | queue: new_queue}}
  115. end
  116.  
  117. def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
  118. publish_message(channel, routing_key, payload)
  119.  
  120. {:noreply, state}
  121. end
  122.  
  123. def handle_call(:close, from, state) do
  124. {:disconnect, {:close, from}, state}
  125. end
  126.  
  127. # conn process is terminated
  128. def handle_info({:EXIT, from, reason}, %{conn: from} = state) do
  129. {:disconnect, {:error, reason}, state}
  130. end
  131.  
  132. # channel process is terminated
  133. def handle_info({:EXIT, from, reason}, %{channel: from} = state) do
  134. {:disconnect, {:error, reason}, state}
  135. end
  136.  
  137. def handle_info(info, state) do
  138. Logger.warn(fn -> "handle_info: #{inspect(info)}, state: #{inspect(state)}" end)
  139.  
  140. {:noreply, state}
  141. end
  142.  
  143. def terminate(_reason, state) do
  144. close_rabbit_channel_and_connection(state)
  145. end
  146.  
  147. def close_rabbit_channel_and_connection(state) do
  148. if Process.alive?(state.channel), do: :amqp_channel.close(state.channel)
  149. if Process.alive?(state.conn), do: :amqp_connection.close(state.conn)
  150. end
  151.  
  152. def open(options) when is_list(options) do
  153. host = options
  154. |> Keyword.get(:hosts, ["localhost"])
  155. |> List.first
  156. |> to_charlist
  157.  
  158. Logger.info(fn -> "Connecting to RabbitMQ host: `#{host}` with: #{inspect(options)}" end)
  159.  
  160. amqp_params =
  161. amqp_params_network(
  162. username: Keyword.get(options, :username, "guest"),
  163. password: Keyword.get(options, :password, "guest"),
  164. virtual_host: Keyword.get(options, :virtual_host, "/"),
  165. host: host,
  166. port: Keyword.get(options, :port, :undefined),
  167. heartbeat: Keyword.get(options, :heartbeat, 0),
  168. connection_timeout: Keyword.get(options, :connection_timeout, :infinity),
  169. auth_mechanisms:
  170. Keyword.get(options, :auth_mechanisms, [
  171. &:amqp_auth_mechanisms.plain/3,
  172. &:amqp_auth_mechanisms.amqplain/3
  173. ])
  174. )
  175.  
  176. case :amqp_connection.start(amqp_params) do
  177. {:ok, pid} -> {:ok, pid}
  178. error -> error
  179. end
  180. end
  181. end
Add Comment
Please, Sign In to add comment