Created
May 22, 2019 15:39
-
-
Save keathley/68001608877623a4f48b8138e3f353f1 to your computer and use it in GitHub Desktop.
Rabbit connection statemachine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule RabbitConnection do | |
@moduledoc """ | |
RabbitMQ client connection | |
""" | |
@behaviour :gen_statem | |
use AMQP | |
alias AMQP.Basic | |
@exchange "messages_exchange" | |
@queue "custom_alert_queue" | |
@queue_error "#{@queue}_error" | |
@retry_time 1_000 | |
require Logger | |
def queue, do: @queue | |
def callback_mode do | |
[:state_functions, :state_enter] | |
end | |
def child_spec(opts) do | |
%{ | |
id: __MODULE__, | |
start: {__MODULE__, :start_link, [opts]}, | |
type: :worker, | |
restart: :permanent, | |
shutdown: 500 | |
} | |
end | |
def start_link({conn_opts, opts}) do | |
config = | |
config() | |
|> Keyword.merge(conn_opts) | |
case Keyword.pop(opts, :name) do | |
{nil, opts} -> | |
:gen_statem.start_link(__MODULE__, config, opts) | |
{name, opts} -> | |
:gen_statem.start_link({:local, name}, __MODULE__, config, opts) | |
end | |
end | |
def send_notification(server, notification) do | |
:gen_statem.call(server, {:publish, Jason.encode!(notification)}) | |
end | |
def status(server) do | |
:gen_statem.call(server, :status) | |
end | |
def init(conn_opts) do | |
data = %{ | |
channel: nil, | |
config: conn_opts, | |
error_count: 0 | |
} | |
{:ok, :disconnected, data} | |
end | |
def disconnected(:enter, _, data) do | |
Logger.error("Entering disconnected state") | |
actions = [{:state_timeout, backoff(data.error_count), :connect}] | |
data = %{data | error_count: data.error_count + 1} | |
{:keep_state, data, actions} | |
end | |
def disconnected({:call, from}, {:publish, _}, _data) do | |
{:keep_state_and_data, [{:reply, from, {:error, :disconnected}}]} | |
end | |
def disconnected({:call, from}, :status, _data) do | |
{:keep_state_and_data, [{:reply, from, :disconnected}]} | |
end | |
def disconnected(:state_timeout, :connect, data) do | |
Logger.info("State timeout, #{inspect(data)}") | |
case Connection.open(data.config) do | |
{:ok, conn} -> | |
Process.monitor(conn.pid) | |
{:ok, chan} = Channel.open(conn) | |
setup_queues(chan) | |
{:next_state, :connected, %{data | channel: chan}} | |
{:error, _} -> | |
Statix.increment("rabbitmq.connection_failure") | |
Logger.error("Could not connect to rabbitmq") | |
{:repeat_state, data} | |
end | |
end | |
def connected(:enter, _, data) do | |
Statix.increment("rabbitmq.connected") | |
{:keep_state, %{data | error_count: 0}} | |
end | |
def connected({:call, from}, {:publish, notification}, data) do | |
res = Basic.publish(data.channel, @exchange, @queue, notification) | |
{:keep_state_and_data, [{:reply, from, res}]} | |
end | |
def connected({:call, from}, :status, _data) do | |
{:keep_state_and_data, [{:reply, from, :connected}]} | |
end | |
def connected(:info, {:DOWN, _, :process, _pid, _reason}, data) do | |
Statix.increment("rabbitmq.disconnect") | |
{:next_state, :disconnected, data} | |
end | |
defp setup_queues(chan) do | |
{:ok, _} = Queue.declare(chan, @queue_error, durable: true) | |
{:ok, _} = | |
Queue.declare(chan, @queue, | |
durable: true, | |
arguments: [ | |
{"x-dead-letter-exchange", :longstr, ""}, | |
{"x-dead-letter-routing-key", :longstr, @queue_error} | |
] | |
) | |
Exchange.declare(chan, @exchange, :direct, durable: true) | |
Queue.bind(chan, @queue, @exchange, routing_key: @queue) | |
end | |
defp backoff(0), do: 0 | |
defp backoff(count) do | |
factor = :rand.uniform(trunc(:math.pow(2, count)) - 1) | |
factor * @retry_time | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment