Last active
June 29, 2022 21:02
-
-
Save jeffweiss/677edeac22796d2889c8 to your computer and use it in GitHub Desktop.
RabbitMQ Tutorial - Elixir examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
message = Enum.join(System.argv, " ") || "Hello World!" | |
AMQP.Exchange.declare(channel, "logs", :fanout) | |
AMQP.Basic.publish(channel, "logs", "", message) | |
IO.puts " [x] Sent '#{message}'" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
{severities, raw_message, _} = | |
System.argv | |
|> OptionParser.parse(strict: [info: :boolean, warning: :boolean, error: :boolean]) | |
|> case do | |
{[], msg, _} -> {[info: true], msg, []} | |
other -> other | |
end | |
message = Enum.join(raw_message, " ") | |
AMQP.Exchange.declare(channel, "direct_logs", :direct) | |
for {severity, true} <- severities do | |
severity = severity |> to_string | |
AMQP.Basic.publish(channel, "direct_logs", severity, message) | |
IO.puts " [x] Sent '[#{severity}] #{message}'" | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule RabbitmqTutorials.Mixfile do | |
use Mix.Project | |
def project do | |
[app: :rabbitmq_tutorials, | |
version: "0.0.1", | |
elixir: "~> 1.1", | |
build_embedded: Mix.env == :prod, | |
start_permanent: Mix.env == :prod, | |
deps: deps] | |
end | |
# Configuration for the OTP application | |
# | |
# Type "mix help compile.app" for more information | |
def application do | |
[applications: [:logger, :amqp]] | |
end | |
# Dependencies can be Hex packages: | |
# | |
# {:mydep, "~> 0.3.0"} | |
# | |
# Or git/path repositories: | |
# | |
# {:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"} | |
# | |
# Type "mix help deps" for more examples and options | |
defp deps do | |
[ | |
{:amqp, "~> 0.1.4"}, | |
] | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Queue.declare(channel, "task_queue", durable: true) | |
message = Enum.join(System.argv, " ") || "Hello World!" | |
AMQP.Basic.publish(channel, "", "hello", message, persistent: true) | |
IO.puts " [x] Sent '#{message}'" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Receive do | |
def wait_for_messages do | |
receive do | |
{:basic_deliver, payload, _meta} -> | |
IO.puts " [x] Received #{payload}" | |
wait_for_messages | |
end | |
end | |
end | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Queue.declare(channel, "hello") | |
AMQP.Basic.consume(channel, "hello", nil, no_ack: true) | |
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C" | |
Receive.wait_for_messages |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ReceiveLogs do | |
def wait_for_messages(channel) do | |
receive do | |
{:basic_deliver, payload, _meta} -> | |
IO.puts " [x] Received #{payload}" | |
wait_for_messages(channel) | |
end | |
end | |
end | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Exchange.declare(channel, "logs", :fanout) | |
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true) | |
AMQP.Queue.bind(channel, queue_name, "logs") | |
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true) | |
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C" | |
ReceiveLogs.wait_for_messages(channel) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ReceiveLogsDirect do | |
def wait_for_messages(channel) do | |
receive do | |
{:basic_deliver, payload, _meta} -> | |
IO.puts " [x] Received #{payload}" | |
wait_for_messages(channel) | |
end | |
end | |
end | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Exchange.declare(channel, "direct_logs", :direct) | |
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true) | |
|> IO.inspect | |
{severities, _, _} = OptionParser.parse(System.argv, strict: [info: :boolean, warning: :boolean, error: :boolean]) | |
IO.inspect severities | |
for {severity, true} <- severities do | |
severity = severity |> to_string | |
IO.puts "binding with #{severity}" | |
AMQP.Queue.bind(channel, queue_name, "direct_logs", routing_key: severity) | |
|> IO.inspect | |
end | |
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true) | |
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C" | |
ReceiveLogsDirect.wait_for_messages(channel) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Queue.declare(channel, "hello") | |
AMQP.Basic.publish(channel, "", "hello", "Hello World!") | |
IO.puts " [x] Sent 'Hello World!'" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Worker do | |
def wait_for_messages(channel) do | |
receive do | |
{:basic_deliver, payload, meta} -> | |
IO.puts " [x] Received #{payload}" | |
payload | |
|> String.split(".") | |
|> Enum.count | |
|> Kernel.-(1) | |
|> Kernel.*(1000) | |
|> :timer.sleep | |
IO.puts " [x] Done." | |
AMQP.Basic.ack(channel, meta.delivery_tag) | |
wait_for_messages(channel) | |
end | |
end | |
end | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Queue.declare(channel, "task_queue", durable: true) | |
AMQP.Basic.qos(channel, prefetch_count: 1) | |
AMQP.Basic.consume(channel, "hello") | |
IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C" | |
Worker.wait_for_messages(channel) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment