Skip to content

Instantly share code, notes, and snippets.

@3zcurdia
Last active March 18, 2022 20:32
Show Gist options
  • Save 3zcurdia/c4d3efd0117bd5a045bd0228fac81162 to your computer and use it in GitHub Desktop.
Save 3zcurdia/c4d3efd0117bd5a045bd0228fac81162 to your computer and use it in GitHub Desktop.
RabbitMQ Basic Implementation
# frozen_string_literal: true
require 'bunny'
class BunnyClient
def initialize(routing_key)
@routing_key = routing_key
@conn = Bunny.new(ENV.fetch("CLOUDAMQP_URL", "amqp://[email protected]:5672"))
start
end
protected
attr_reader :conn, :channel, :routing_key
def exchange
@exchange ||= channel.direct("amq.direct")
end
private
def start
conn.start
@channel = conn.create_channel
end
end
# frozen_string_literal: true
source "https://rubygems.org"
gem "bunny", "~> 2.19"
GEM
remote: https://rubygems.org/
specs:
amq-protocol (2.3.2)
bunny (2.19.0)
amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2)
rbtree (0.4.5)
set (1.0.2)
sorted_set (1.0.3)
rbtree
set (~> 1.0)
PLATFORMS
arm64-darwin-21
DEPENDENCIES
bunny (~> 2.19)
BUNDLED WITH
2.3.7
# frozen_string_literal: true
require_relative "./publisher.rb"
publisher = Publisher.new(:test)
25.times { |n| publisher.push("Hello world #{n}") }
publisher = Publisher.new(:prueba)
25.times { |n| publisher.push("Hola mundo #{n}") }
# frozen_string_literal: true
require_relative "./bunny_client.rb"
class Publisher < BunnyClient
def self.push(routing_key, message)
new(routing_key).push(message)
end
def push(msg)
exchange.publish(msg, routing_key: routing_key)
end
end
# frozen_string_literal: true
require_relative "./subscriber.rb"
%i[test prueba].map do |route|
Thread.new { Subscriber.subscribe(route) }
end.map(&:join)
# frozen_string_literal: true
require_relative "./bunny_client.rb"
class Subscriber < BunnyClient
def self.subscribe(routing_key, &block)
subs = new(routing_key)
subs.subscribe(&block)
end
def subscribe
queue.subscribe(block: true) do |delivery_info, properties, payload|
puts("[#{queue.name}] received: #{payload}")
yield(delivery_info, properties, payload) if block_given?
end
conn.stop
end
private
def queue
@queue ||= channel.queue("", auto_delete: true).bind(exchange, routing_key: routing_key)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment