Skip to content

Instantly share code, notes, and snippets.

@jeffweiss
Last active June 29, 2022 21:02
Show Gist options
  • Save jeffweiss/677edeac22796d2889c8 to your computer and use it in GitHub Desktop.
Save jeffweiss/677edeac22796d2889c8 to your computer and use it in GitHub Desktop.
RabbitMQ Tutorial - Elixir examples
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
message = Enum.join(System.argv, " ") || "Hello World!"
AMQP.Exchange.declare(channel, "logs", :fanout)
AMQP.Basic.publish(channel, "logs", "", message)
IO.puts " [x] Sent '#{message}'"
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
{severities, raw_message, _} =
System.argv
|> OptionParser.parse(strict: [info: :boolean, warning: :boolean, error: :boolean])
|> case do
{[], msg, _} -> {[info: true], msg, []}
other -> other
end
message = Enum.join(raw_message, " ")
AMQP.Exchange.declare(channel, "direct_logs", :direct)
for {severity, true} <- severities do
severity = severity |> to_string
AMQP.Basic.publish(channel, "direct_logs", severity, message)
IO.puts " [x] Sent '[#{severity}] #{message}'"
end
defmodule RabbitmqTutorials.Mixfile do
use Mix.Project
def project do
[app: :rabbitmq_tutorials,
version: "0.0.1",
elixir: "~> 1.1",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps]
end
# Configuration for the OTP application
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger, :amqp]]
end
# Dependencies can be Hex packages:
#
# {:mydep, "~> 0.3.0"}
#
# Or git/path repositories:
#
# {:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"}
#
# Type "mix help deps" for more examples and options
defp deps do
[
{:amqp, "~> 0.1.4"},
]
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "task_queue", durable: true)
message = Enum.join(System.argv, " ") || "Hello World!"
AMQP.Basic.publish(channel, "", "hello", message, persistent: true)
IO.puts " [x] Sent '#{message}'"
defmodule Receive do
def wait_for_messages do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts " [x] Received #{payload}"
wait_for_messages
end
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "hello")
AMQP.Basic.consume(channel, "hello", nil, no_ack: true)
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"
Receive.wait_for_messages
defmodule ReceiveLogs do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts " [x] Received #{payload}"
wait_for_messages(channel)
end
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Exchange.declare(channel, "logs", :fanout)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
AMQP.Queue.bind(channel, queue_name, "logs")
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"
ReceiveLogs.wait_for_messages(channel)
defmodule ReceiveLogsDirect do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts " [x] Received #{payload}"
wait_for_messages(channel)
end
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Exchange.declare(channel, "direct_logs", :direct)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
|> IO.inspect
{severities, _, _} = OptionParser.parse(System.argv, strict: [info: :boolean, warning: :boolean, error: :boolean])
IO.inspect severities
for {severity, true} <- severities do
severity = severity |> to_string
IO.puts "binding with #{severity}"
AMQP.Queue.bind(channel, queue_name, "direct_logs", routing_key: severity)
|> IO.inspect
end
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"
ReceiveLogsDirect.wait_for_messages(channel)
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "hello")
AMQP.Basic.publish(channel, "", "hello", "Hello World!")
IO.puts " [x] Sent 'Hello World!'"
defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> String.split(".")
|> Enum.count
|> Kernel.-(1)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages(channel)
end
end
end
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "task_queue", durable: true)
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, "hello")
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"
Worker.wait_for_messages(channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment