Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- defmodule Rabbit do
- use Connection
- import AMQP.Core
- require Logger
- @exchange_name "exchange_name"
- @content_type "application/msgpack"
- def publish(routing_key, payload) do
- Connection.cast(:rabbit, {:publish, routing_key, payload})
- end
- def start_link(connection_options \\ [], options \\ []) do
- Connection.start_link(__MODULE__, connection_options, options)
- end
- def init(opts) do
- Process.flag(:trap_exit, true)
- state = %{conn: nil, opts: opts, channel: nil, queue: :queue.new()}
- {:connect, :init, state}
- end
- def close(conn), do: Connection.call(conn, :close)
- def stop(conn), do: GenServer.stop(conn)
- def connect(_, state) do
- with {:ok, conn} <- open(state.opts),
- true <- Process.link(conn),
- {:ok, channel} <- :amqp_connection.open_channel(conn),
- true <- Process.link(channel),
- :ok <- declare_direct_exchange(channel),
- :ok <- dequeue(state.queue, channel) do
- Logger.info(fn -> "Connected to RabbitMQ" end)
- {:ok, %{state | conn: conn, channel: channel, queue: :queue.new()}}
- else
- {:error_dequeue, reason, queue} ->
- Logger.error("Dequeue error: #{inspect(reason)}")
- network_recovery_interval = Keyword.get(state.opts, :network_recovery_interval)
- {:backoff, network_recovery_interval, %{state | queue: queue}}
- {:error, reason} ->
- Logger.error("Connection error: #{inspect(reason)}")
- state = rotate_hosts(state)
- Logger.info(fn -> "Rotated hosts: #{inspect(state.opts)}" end)
- network_recovery_interval = Keyword.get(state.opts, :network_recovery_interval)
- {:backoff, network_recovery_interval, state}
- end
- end
- defp dequeue(queue, channel) do
- case :queue.out(queue) do
- {{:value, {routing_key, payload}}, new_queue} ->
- case publish_message(channel, routing_key, payload) do
- :ok -> dequeue(new_queue, channel)
- {:error, error} -> {:error_dequeue, error, queue}
- end
- {:empty, _} ->
- :ok
- end
- end
- defp declare_direct_exchange(channel) do
- exchange_declare =
- exchange_declare(
- exchange: @exchange_name,
- type: "direct",
- passive: false,
- durable: true,
- auto_delete: false,
- internal: false,
- nowait: false,
- arguments: []
- )
- exchange_declare_ok() = :amqp_channel.call(channel, exchange_declare)
- :ok
- end
- def disconnect(info, state) do
- Logger.warn(fn -> "Disconnected: #{inspect(info)}" end)
- close_rabbit_channel_and_connection(state)
- {:connect, :reconnect, %{state | conn: nil, channel: nil}}
- end
- defp rotate_hosts(state) do
- [host | hosts] = Keyword.get(state.opts, :hosts)
- opts = Keyword.put(state.opts, :hosts, hosts ++ [host])
- %{state | opts: opts}
- end
- defp publish_message(channel, routing_key, payload) do
- basic_publish =
- basic_publish(
- exchange: @exchange_name,
- routing_key: routing_key
- )
- p_basic =
- p_basic(
- content_type: @content_type,
- delivery_mode: 2 # persistent
- )
- case :amqp_channel.call(channel, basic_publish, amqp_msg(props: p_basic, payload: payload)) do
- :ok -> :ok
- error -> {:error, error}
- end
- end
- def handle_cast({:publish, routing_key, payload}, %{conn: nil, queue: queue} = state) do
- new_queue = :queue.in({routing_key, payload}, queue)
- {:noreply, %{state | queue: new_queue}}
- end
- def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
- publish_message(channel, routing_key, payload)
- {:noreply, state}
- end
- def handle_call(:close, from, state) do
- {:disconnect, {:close, from}, state}
- end
- # conn process is terminated
- def handle_info({:EXIT, from, reason}, %{conn: from} = state) do
- {:disconnect, {:error, reason}, state}
- end
- # channel process is terminated
- def handle_info({:EXIT, from, reason}, %{channel: from} = state) do
- {:disconnect, {:error, reason}, state}
- end
- def handle_info(info, state) do
- Logger.warn(fn -> "handle_info: #{inspect(info)}, state: #{inspect(state)}" end)
- {:noreply, state}
- end
- def terminate(_reason, state) do
- close_rabbit_channel_and_connection(state)
- end
- def close_rabbit_channel_and_connection(state) do
- if Process.alive?(state.channel), do: :amqp_channel.close(state.channel)
- if Process.alive?(state.conn), do: :amqp_connection.close(state.conn)
- end
- def open(options) when is_list(options) do
- host = options
- |> Keyword.get(:hosts, ["localhost"])
- |> List.first
- |> to_charlist
- Logger.info(fn -> "Connecting to RabbitMQ host: `#{host}` with: #{inspect(options)}" end)
- amqp_params =
- amqp_params_network(
- username: Keyword.get(options, :username, "guest"),
- password: Keyword.get(options, :password, "guest"),
- virtual_host: Keyword.get(options, :virtual_host, "/"),
- host: host,
- port: Keyword.get(options, :port, :undefined),
- heartbeat: Keyword.get(options, :heartbeat, 0),
- connection_timeout: Keyword.get(options, :connection_timeout, :infinity),
- auth_mechanisms:
- Keyword.get(options, :auth_mechanisms, [
- &:amqp_auth_mechanisms.plain/3,
- &:amqp_auth_mechanisms.amqplain/3
- ])
- )
- case :amqp_connection.start(amqp_params) do
- {:ok, pid} -> {:ok, pid}
- error -> error
- end
- end
- end
Add Comment
Please, Sign In to add comment