Last active
November 12, 2023 08:48
-
-
Save ginjo/99f169332778725ac3fc9f65bb1a0fff to your computer and use it in GitHub Desktop.
Wrappers for Bunny Gem (rabbitmq) providing 1. Multiple consumer threads, 2. Simple setup of RPC call/callback publishers and consumers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# This gemspec currently describes a gem hosted on Github as a Gist only. | |
# See https://bundler.io/guides/git.html | |
require_relative "bunny_wrapper_version.rb" | |
Gem::Specification.new do |spec| | |
spec.name = "bunny_wrapper" | |
spec.version = BunnyWrapper::VERSION | |
spec.authors = ["wbr"] | |
spec.email = ["[email protected]"] | |
spec.summary = "RabbitMQ RPC with Bunny" | |
#spec.description = "TODO: Write a longer description or delete this line." | |
spec.homepage = "https://gist.github.com/ginjo/99f169332778725ac3fc9f65bb1a0fff" | |
spec.license = "MIT" | |
spec.required_ruby_version = ">= 2.6.0" | |
spec.metadata["allowed_push_host"] = "TODO: Set to your gem server 'https://example.com'" | |
spec.metadata["homepage_uri"] = spec.homepage | |
spec.metadata["source_code_uri"] = spec.homepage | |
#spec.metadata["changelog_uri"] = "TODO: Put your gem's CHANGELOG.md URL here." | |
# Specify which files should be added to the gem when it is released. | |
# The `git ls-files -z` loads the files in the RubyGem that have been added into git. | |
spec.files = Dir.chdir(__dir__) do | |
`git ls-files -z`.split("\x0").reject do |f| | |
(f == __FILE__) || f.match(%r{\A(?:(?:bin|test|spec|features)/|\.(?:git|travis|circleci)|appveyor)}) | |
end | |
end | |
spec.bindir = "." | |
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) } | |
spec.require_paths = ["."] | |
# Uncomment to register a new dependency of your gem | |
# spec.add_dependency "example-gem", "~> 1.0" | |
spec.add_dependency "bunny" | |
spec.add_dependency "json_rpc_object" | |
spec.add_dependency "connection_pool" | |
# For more information and examples about making a new gem, check out our | |
# guide at: https://bundler.io/guides/creating_gem.html | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Custom classes for Bunny gem (RabbitMQ client for Ruby). | |
### Allows creation of multiple consumer threads each with their own channel. | |
### This is a Gist on Github. | |
### | |
### https://gist.github.com/ginjo/99f169332778725ac3fc9f65bb1a0fff | |
### | |
### This script depends on the 'bunny' gem. | |
### | |
### This script depends on the JsonRpcObject gist (json_rpc_object.rb). Download the file here: | |
### https://gist.github.com/ginjo/5b128c655e79e1b4e0e10c27a5098177 | |
### | |
### You have (at least) three options for loading the JsonRpcObject gist: | |
### 1. manuallly require it in your project. | |
### 2. put the file in the same directory as this bunny_wrapper.rb file. | |
### 3. Set the constant JSONRPC_GIST_PATH with the path to the json_rpc_object.rb file. | |
### | |
### If you set manual_ack:true on a consumer, you must acknowledge manually: | |
### channel.ack(delivery_info[:delivery_tag]) | |
### | |
### Example basic usage: | |
### r = BunnyWrapper.new host:'172.31.4.108' | |
### r.subscribe('task_queue', thread_pool_size:3, durable:true, manual_ack:true){|a, b, c, ch| puts [a, b, c]; ch.ack(a[:delivery_tag])} | |
### r.publish('task_queue','Hello13!') | |
### | |
### Example json_rpc usage: | |
### # on machine 1 | |
### caller = BunnyWrapper.json_rpc_caller('task_queue', reply_to:'callback_queue'){|json_rpc_response| puts json_rpc_response.to_yaml} | |
### | |
### # on machine 2 | |
### BunnyWrapper.json_rpc_responder('task_queue', reply_to:'callback_queue'){|json_rpc_request| {some_response_data:'anything'}} | |
### | |
### # on machine 1 | |
### caller['my_method', {my_data:{a:1, b:2}}] | |
### # asynchronous resposne... | |
### # => {"json_rpc"=>"2.0", "id"="<automatically-managed-id>", "response"=>{"some_response_data"=>"anything"}} | |
### | |
### For concurrency reference, see: | |
### http://rubybunny.info/articles/concurrency.html#consumer_work_pools | |
### | |
### See below for TODO's. | |
### | |
### See here for Ruby Bunny documentation: http://rubybunny.info/articles/guides.html | |
### See here for helpful rabbitmq explanation: https://www.rabbitmq.com/tutorials/amqp-concepts.html | |
### | |
### | |
require 'bunny' | |
require 'securerandom' | |
require 'logger' | |
require 'connection_pool' | |
require 'delegate' | |
require_relative 'bunny_wrapper_version' | |
begin | |
require 'json_rpc_object' | |
rescue LoadError => err | |
puts "BunnyWrapper requires the json_rpc_object library from the the following gist:" | |
puts " https://gist.github.com/ginjo/5b128c655e79e1b4e0e10c27a5098177" | |
puts "Add this gem requirement to your Gemfile, then bundle install:" | |
puts " gem 'json_rpc_object', gist:'5b128c655e79e1b4e0e10c27a5098177'" | |
raise | |
end | |
# This assumes the json_rpc_object.rb gist file at the following path. | |
# You may also manually require the json_rpc_object.rb file. See above for gist link. | |
# JSONRPC_GIST_PATH ||= ENV['JSONRPC_GIST_PATH'] | |
# begin | |
# Module.const_defined?(:JsonRpcObject) || | |
# require_relative(Module.const_defined?(:JSONRPC_GIST_PATH) ? JSONRPC_GIST_PATH : 'json_rpc_object.rb') | |
# rescue | |
# end | |
class BunnyWrapper | |
attr_reader :session, :threads, :consumers, :channels, :channel_opts, :queue_opts, :subscription_opts, :logger | |
attr_reader :channel_pool, :lock, :responders | |
CHANNEL_OPTS_KEYS = [ | |
# create_channel() actually takes positional args, not kwargs, but these options | |
# will be passed in as positional. | |
:thread_pool_size, | |
:channel_pool_size, | |
:channel_pool_timeout, | |
:consumer_pool_abort_on_exception, | |
:consumer_pool_shutdown_timeout, | |
:prefetch, | |
:prefetch_global, | |
] | |
QUEUE_OPTS_KEYS = [ | |
:durable, # (Boolean) — default: false — Should this queue be durable? | |
:auto_delete, # (Boolean) — default: false — Should this queue be automatically deleted when the last consumer disconnects? | |
:exclusive, # (Boolean) — default: false — Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)? | |
:arguments, # (Hash) — default: {} — Additional optional arguments (typically used by RabbitMQ extensions and plugins) | |
] | |
SUBSCRIPTION_OPTS_KEYS = [ | |
:ack, # (Boolean) — default: false — [DEPRECATED] Use :manual_ack instead | |
:manual_ack, # (Boolean) — default: false — Will this consumer use manual acknowledgements? | |
:exclusive, # (Boolean) — default: false — Should this consumer be exclusive for this queue? | |
:on_cancellation, # (#call) — Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin) | |
:consumer_tag, # (String) — Unique consumer identifier. It is usually recommended to let Bunny generate it for you. | |
:arguments, # (Hash) — default: {} — Additional (optional) arguments, typically used by RabbitMQ extensions | |
:thread_pool_size,# (Int) - It is used in subscribe() to tell us how many consumers to spawn. | |
] | |
def default_logger | |
_logger = ::Logger.new(STDOUT) | |
log_level = (::Logger.const_get(ENV['LOG_LEVEL'] || 'INFO') rescue Logger::INFO) | |
_logger.level = log_level | |
_logger.datetime_format = '%Y-%m-%d %H:%M:%6N %z' | |
_logger | |
end | |
# Define consumer subclass. | |
# I saw this in an example somewhere but don't know what it's for. | |
# Update: I saw it used in rabbitmq/bunny docs somewhere. If you cancel | |
# subscriptions, you might want to subclass Bunny::Consumer. | |
# | |
# class Consumer < Bunny::Consumer | |
# def cancelled? | |
# @cancelled | |
# end | |
# | |
# def handle_cancellation(_) | |
# @cancelled = true | |
# end | |
# end | |
# Creates a new instance of this class with an associated session to rabbitmq server. | |
# | |
# You can pass-in Bunny.new opts AND subscription opts here, and BunnyWrapper will sort them out. | |
# Make sure that the opts you want to use are included in SUBSCRIPTION_DEFAULTS above. | |
# | |
# Note that reset_queues does not currently do anything. You must issue a cli command to do that: | |
# rabbitmqadmin delete queue name=name-of-queue | |
# | |
def initialize(reset_queues:[], existing_connection:nil, **opts) | |
@logger = opts.delete(:logger) || default_logger | |
# See here for durability info. | |
# http://rubybunny.info/articles/durability.html | |
@channel_opts = opts.slice(*CHANNEL_OPTS_KEYS) | |
@queue_opts = opts.slice(*QUEUE_OPTS_KEYS) | |
@subscription_opts = opts.slice(*SUBSCRIPTION_OPTS_KEYS) | |
opts = opts.except(*CHANNEL_OPTS_KEYS, *QUEUE_OPTS_KEYS, *SUBSCRIPTION_OPTS_KEYS) | |
logger.info{ "BW initializing with @channel_opts: #{channel_opts}" } | |
logger.info{ "BW initializing with @queue_opts: #{queue_opts}" } | |
logger.info{ "BW initializing with @subscription_opts: #{subscription_opts}" } | |
logger.info{ "BW initializing with (remaining) opts: #{opts}" } | |
@session = (existing_connection ? existing_connection : Bunny.new(**opts)) | |
# Loops until connected to rabbitmq server. | |
until ( | |
begin | |
@session.start | |
rescue Bunny::Exception | |
false | |
end | |
) do | |
logger.info{ "#{self} waiting for rabbitmq server..." } | |
sleep 3 | |
end | |
logger.info{ "#{self} connected to rabbitmq server: #{opts}" } | |
@session | |
end | |
# Creates channel, using global and local opts. | |
# See https://www.rubydoc.info/github/ruby-amqp/bunny/Bunny%2FSession:create_channel | |
# | |
def create_channel(**_opts) | |
opts = channel_opts.merge(_opts) | |
channel_id = opts[:channel_id] | |
thread_pool_size = (opts[:thread_pool_size]).to_i | |
consumer_pool_abort_on_exception = (opts[:consumer_pool_abort_on_exception] || false) | |
consumer_pool_shutdown_timeout = (opts[:consumer_pool_shutdown_timeout] || 60).to_i | |
prefetch = (opts[:prefetch] || 1).to_i | |
prefetch_global = (opts[:prefetch_global] || false) | |
ch = @session.create_channel(channel_id, thread_pool_size, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout) | |
# See here for confusing info about rabbitmq prefetch. | |
# https://www.rabbitmq.com/consumer-prefetch.html | |
ch.prefetch(prefetch, prefetch_global) | |
ch | |
end | |
def channel_pool(**_opts) | |
return @channel_pool if @channel_pool | |
opts = channel_opts.merge(_opts) | |
# See here for 'channel' pooling. This ~should~ solve our problems with sharing publishing channels between threads: | |
# https://wework.github.io/ruby/rails/bunny/rabbitmq/threads/concurrency/puma/errors/2015/11/12/bunny-threads/ | |
# https://github.com/mperham/connection_pool | |
# | |
channel_pool_size = (opts[:channel_pool_size] || 1).to_i | |
channel_pool_timeout = (opts[:channel_pool_timeout] || 15).to_i | |
# | |
# Channels for all publishing and rpc consuming. | |
# Persistant consumers get their own dedicated channel, not from this pool. | |
@channel_pool = ConnectionPool::Wrapper.new( | |
size: channel_pool_size, | |
timeout: channel_pool_timeout | |
# The thread-pool is set here, but in most normal cases (publish, rpc-subscribe), | |
# it won't actually grow to a size > 1. And it won't even be constructed | |
# if no consumer is active on the channel. | |
#){ @session.create_channel(nil, thread_pool_size) } | |
){ create_channel(**opts) } | |
end | |
# Stop using this, as it could conflict with method-local vars. | |
#alias :channel :channel_pool | |
# Publicly accessible publish method. | |
# Handles message_id, object-to-json conversion, channel pool | |
# | |
# Returns message_id | |
# | |
# See here for info on rabbit queues & exchanges. | |
# https://www.rabbitmq.com/tutorials/amqp-concepts.html | |
# | |
# | |
# This original version works great. | |
# | |
# def publish(payload, routing_key:, **opts) | |
# | |
# opts[:message_id] ||= payload.is_a?(JsonRpcObject) ? payload['id'] : SecureRandom.uuid | |
# opts[:content_type] ||= payload.is_a?(String) ? 'application/octet-stream' : 'application/json' | |
# payload_string = payload.is_a?(String) ? payload : payload.to_h.to_json | |
# | |
# channel_pool.default_exchange.publish(payload_string, routing_key:, **opts) | |
# | |
# logger.debug{ "BW published to '#{routing_key}', '#{opts[:message_id]}', '#{opts[:content_type]}'" } | |
# | |
# return opts[:message_id] | |
# end | |
# | |
# This is the composit publish/publish_jrpc version. | |
# √ TODO: Consider flipping the order of payload, jrpc_method, since Jamulii | |
# is doing it that way, and it looks much better when actually using the jrpc_method. | |
# | |
# √ TODO: The rpc response message_id is the same as the request message_id, | |
# if the response payload is a JsonRpcObject (even if it's a JsonRpcObject result) | |
# | |
# TODO: Maybe we should create subclasses for JsonRpcObject for each type, | |
# but that's a JsonRpcObject gem issue. | |
# | |
def publish(jrpc_method=nil, payload, jrpc_type:'request', routing_key:, **opts) | |
logger.debug{"BW#publish '#{routing_key}' payload is_a? #{payload.class}"} | |
# If payload is JsonRpcObject | |
if payload.is_a?(JsonRpcObject) | |
payload_string = payload.to_h.to_json | |
opts[:message_id] ||= (payload.has_key?('result') ? SecureRandom.uuid : payload['id']) | |
opts[:content_type] ||= 'application/json-rpc' | |
jrpc_type = nil # just to be tidy | |
# If this is JsonRpc call with method + data | |
#elsif ( payload.is_a?(Hash) || payload.is_a?(Array) ) && jrpc_method | |
elsif jrpc_method | |
# Create a JsonRpcObject from hash or array. | |
payload_jrpc = JsonRpcObject.send(jrpc_type, jrpc_method, payload) | |
payload_string = payload_jrpc.to_h.to_json | |
opts[:message_id] ||= payload_jrpc['id'] | |
opts[:content_type] ||= 'application/json-rpc' | |
# If payload is Hash | |
elsif payload.is_a?(Hash) | |
# Just convert to json. | |
payload_string = payload.to_h.to_json | |
opts[:message_id] ||= SecureRandom.uuid | |
opts[:content_type] ||= 'application/json' | |
# If payload is Array | |
elsif payload.is_a?(Array) | |
# Just convert to json. | |
payload_string = payload.to_a.to_json | |
opts[:message_id] ||= SecureRandom.uuid | |
opts[:content_type] ||= 'application/json' | |
# If payload is String | |
elsif payload.is_a?(String) | |
# Basically a pass-thru. | |
payload_string = payload.to_s | |
opts[:message_id] ||= SecureRandom.uuid | |
opts[:content_type] ||= 'application/octet-stream' | |
end | |
# Chooses a channel source to publish with. | |
# Will accept passed channel:<channel-or-pool>, or default to channel(). | |
# | |
if opts[:channel] | |
logger.debug{"BW using chanel passed as option in BW#publish()."} | |
end | |
_channel = opts[:channel] || channel_pool | |
_channel.default_exchange.publish(payload_string, routing_key:, **opts) | |
logger.debug{ "BW published to '#{routing_key}', '#{opts[:message_id]}', '#{opts[:content_type]}'" } | |
return opts[:message_id] | |
end | |
# Subscribes to a queue, optionally creating thread_pool_size instances/consumers. | |
# Takes a block which is called upon receiving a message. | |
# The user block is passed these params - see receive() method. | |
# The params: are all extra positional args. | |
# The options: are all extra kwargs. | |
# | |
# |delivery_info:, properties:, payload:, params:, options:| | |
# | |
# See here about queues: http://rubybunny.info/articles/queues.html | |
# | |
# Default is auto-ack. Here's a manual-ack example. | |
# delivery_info[:channel].ack(delivery_info[:delivery_tag]) | |
# | |
# TODO: Which method do the opts go to? @channels[n].queue() or q.subscribe_with()? | |
# Currently the opts are going to both methods (without any apparent errors?). | |
# | |
def subscribe(listen_queue, **_opts, &block) | |
opts = subscription_opts.merge(_opts) | |
# We create a dedicated channel for persistant consumers. | |
# We only pass _opts, since subscription_opts doesn't belong in the | |
# create_channel() method. Same with ch.queue() below. | |
ch = create_channel(**_opts) | |
#ch = channel_pool | |
logger.info "BW subscribing to queue '#{listen_queue}' with opts:#{opts}" | |
queue = ch.queue(listen_queue.to_s, **queue_opts, **_opts) | |
# Creates multiple consumers to match thread_pool_size. | |
# We have to do this cuz I guess Bunny doesn't create the extra | |
# consumer instances, given thread_pool_size. | |
# | |
thread_pool_size = (opts[:thread_pool_size]).to_i | |
consumers = thread_pool_size.times.map do |n| | |
#consumers = 1.times.map do |n| | |
consumer = queue.subscribe(consumer_tag: "#{queue.name}_#{n}", **opts) do | |
|delivery_info, properties, payload| | |
logger.debug{"BW consumer block (#{queue.name} - #{n}) received message"} | |
if block_given? | |
#block.call(delivery_info, metadata, payload, channel, consumer) | |
enhanced_message = receive(delivery_info, properties, payload, opts) | |
logger.debug{"BW consumer block (#{queue.name} - #{n}) calling inner block with: #{enhanced_message}"} | |
block.call(**enhanced_message) | |
else | |
logger.warn "BunnyWrapper consumer block (#{queue.name} - #{n}), but no handler block was given." | |
logger.debug payload | |
nil | |
end | |
end | |
end | |
#logger.info "#{self} listening to queue '#{listen_queue}' (#{queue.name}) with pool size #{(opts[:thread_pool_size] || subscription_opts[:thread_pool_size] || 1)}." | |
# Returns the subscription object so we can use it in downstream methods. | |
consumers | |
end | |
# Processes all params and payload returned from bunnywrapper | |
# subscribe() block, and converts payload to ruby object, | |
# if appropriate, based on content_type. | |
# | |
def receive(delivery_info, properties, payload, *params, **options) | |
begin | |
message = case | |
when properties[:content_type].to_s.match?(/json-rpc/i) | |
logger.debug{"#{self}#receive() is processing json-rpc"} | |
JsonRpcObject.load(payload) | |
when properties[:content_type].to_s.match?(/json/i) | |
logger.debug{"#{self}#receive() is processing json"} | |
JSON.load(payload) | |
else | |
logger.debug{"#{self}#receive() is processing text"} | |
payload | |
end | |
rescue => err | |
logger.warn{"#{self}#receive() was not able to process the payload: #{err}"} | |
message = payload | |
end | |
return {delivery_info:, properties:, payload:message, params:, options:} | |
end | |
# A Subscription that publishes the result to the reply_to queue. | |
# See subscribe() or receive() for the kwargs passed to the user block. | |
# | |
def responder(listen_queue, reply_to:nil, **opts, &block) | |
subscribe(listen_queue, **opts) do |*params, delivery_info:, properties:, payload:, **options| | |
# The extra *params and **options above will capture any extra args or kwargs. | |
#puts properties.inspect | |
_reply_to ||= (options[:reply_to] || properties[:reply_to] || _reply_to) | |
#puts "rpc_responder reply_to: #{reply_to}" | |
callback_headers = { | |
original_payload: payload, | |
original_properties: properties.to_h, | |
} | |
out_params = { | |
headers: callback_headers.to_h, #.merge({'exit_code'=>0}), | |
content_type: properties[:content_type], | |
correlation_id: (properties[:message_id] || properties[:correlation_id]), | |
} | |
block_call_params = { | |
params:, #params.dup, | |
delivery_info:, # delivery_info.to_h, | |
properties:, #properties.to_h, | |
payload:, #payload.dup, | |
options:, #options.to_h, | |
} | |
if block_given? | |
rslt = block.call(**block_call_params) | |
else | |
rslt = {error:"BW responder had no block to call, but here are the params to pass:", | |
block_call_params: block_call_params.except(:delivery_info) | |
} | |
end | |
logger.debug{ "Responder '#{listen_queue}' publishing result:" } | |
logger.debug{ rslt.to_yaml } | |
publish(rslt, routing_key:_reply_to, **out_params) | |
end | |
end # responder | |
# Convenience storage hash for responders, keyed by listen_queue name. | |
# TODO: Hmm, I don't know if we want this here. It may be more of an app-level function. | |
def responders(listen_queue, reply_to:nil, **opts, &block) | |
@responders ||= {} | |
@responders[listen_queue] ||= responder(listen_queue, reply_to:, **opts, &block) | |
end | |
# This class creates an instance to wrap a synchronous rpc call/response. | |
# Pass this an instance of BunnyWrapper, when initializing. | |
# SyncRpc.new(<bunnywrapper-instance>, send_queue, **opts)[payload] | |
# | |
# Use the convenience method rpc() for most cases. | |
# | |
# The output is the same as the subscribe() or receive() methods. | |
# | |
# This handles everything for a single synchronous send/receive | |
# rpc call through rabbitmq. This can also handle multiple threads | |
# call this class at the same time. | |
# | |
# It assumes there is a handler on the send-to queue that returns | |
# something to the given reply-to queue. | |
# | |
# This uses the custom BunnyWrapper publish() method, but does NOT | |
# use the BunnyWrapper subscribe() method. | |
# | |
# Extra _opts are currently passed on to each call to Bunny/Rabbitmq. | |
# For most cases, it should not be necessary to pass extra opts. | |
# | |
# For examples of setup and usage of rabbitmq/bunny/bunnywrapper/ruby/threaded/synchronous operations, | |
# see the Terraform rabbitmq/test_examples.rb file. | |
# | |
class SyncRpc < SimpleDelegator | |
attr_accessor :send_queue, :response, :condition, :opts, :lock, :sync_queue | |
def initialize(bw_inst, _send_queue=nil, **_opts) | |
super(bw_inst) | |
@send_queue = _send_queue | |
@opts = _opts | |
end | |
def call(jrpc_method=nil, payload, routing_key:@send_queue, **__opts) | |
_opts = opts.merge(__opts) | |
# Set up channel. | |
# We can actually use a channel from the pool here. | |
# TODO: Would passing **opts here break anything? It would be intended | |
# for users who want to customize their rpc, but I don't know if it's appropriate here. | |
# | |
channel_pool.with do |ch| | |
# Or we can create/destroy a dedicated channel just for this instance. | |
#ch = session.create_channel | |
# exclusive:true was here but didn't do anything, so moved it to subscribe(). | |
# TODO: Is it ok to pass _opts here? | |
reply_queue = ch.queue('', exclusive:true, auto_delete:true, **_opts) | |
reply_queue_name = reply_queue.name | |
# If routing_key is '__test__' we short-circuit the remote responder | |
# and just send to the listen-queue. | |
(routing_key == '__test__') && (routing_key = reply_queue_name) | |
# Set up the lock | |
@lock = Monitor.new | |
@condition = MonitorMixin::ConditionVariable.new(@lock) | |
that = self | |
# Using Queue to make sure condition.wait happens before condition.signal. | |
# See https://stackoverflow.com/questions/52068527/simple-thread-conditional-variable-example-giving-deadlock-in-ruby | |
@sync_queue = Queue.new | |
# Set up consumer | |
consumer = reply_queue.subscribe(manual_ack:true, exclusive:true, **_opts) do |_delivery_info, _properties, payload| | |
that.lock.synchronize do | |
logger.debug{ "SyncRpc#call #{reply_queue_name} received '#{payload}'" } | |
that.response = receive(_delivery_info, _properties, payload, **_opts) | |
logger.debug{"SyncRpc#call #{reply_queue_name} sending ack"} | |
ch.ack(_delivery_info[:delivery_tag]) | |
logger.debug{"SyncRpc#call #{reply_queue_name} calling condition.signal"} | |
that.sync_queue.pop | |
that.condition.signal | |
logger.debug{"SyncRpc#call #{reply_queue_name} after condition.signal"} | |
end | |
# that.lock.synchronize { that.condition.signal } | |
end | |
#ch_publish = session.create_channel | |
lock.synchronize do | |
publish(jrpc_method, payload, routing_key:, channel:ch, reply_to: reply_queue_name, **_opts) # channel: ch_publish | |
logger.debug{"SyncRpc#call #{routing_key} synchronizing condition.wait"} | |
sync_queue << 1 | |
condition.wait | |
logger.debug{"SyncRpc#call #{routing_key} canceling consumer #{consumer}"} | |
consumer.cancel | |
logger.debug{"SyncRpc#call #{routing_key} returning response: #{response}"} | |
end | |
# If discarding one-off channel. | |
#consumer.cancel | |
# If using persistent channel pool | |
end # channel_pool.with | |
return response | |
end | |
alias :[] :call | |
end # SyncRpc | |
# Convenience method that creates a synchronous publish-subscribe | |
# instance with SyncRpc and calls it with the given params. | |
# | |
# The output is same as subscribe() or receive() methods. | |
# | |
# Payload can be string, json, hash, array, or JsonRpcObject. | |
# | |
def rpc(jrpc_method=nil, payload, routing_key:, **opts) | |
rslt = SyncRpc.new(self, routing_key, **opts)[jrpc_method, payload, **opts] | |
end | |
### LEGACY METHODS ### | |
# # For informational use. | |
# def channels | |
# @session.instance_variable_get(:@channels) | |
# # Bunny session@channels holds a hash of channels {1=>ch1, 2=>ch2} | |
# end | |
# | |
# | |
# # All consumers on all session channels. | |
# # For informational use. | |
# def consumers | |
# channels.values.inject({}) do |a,b| | |
# a.merge(b.consumers) | |
# end | |
# end | |
# | |
# # For informational use. | |
# def exchanges | |
# channels.values.collect{|ch| ch.exchanges rescue nil}.flatten | |
# end | |
# | |
# | |
# def queues | |
# channels.values.collect{|ch| ch.queues rescue nil}.flatten | |
# end | |
# | |
# def find_queue(name) | |
# queues.find{|q| q[name]}&.values&.fetch(0) | |
# end | |
# | |
# | |
# Cancels consumers and kills threads. | |
# Note: This does not free memory or delete queues! | |
# | |
# See Queue#purge to clear all messages from a queue. | |
# | |
# def reset! | |
# channels.each(&:close) | |
# end | |
# | |
# | |
# # Experimental: Deletes the queue, triggering the consumer cancellation handler | |
# # Update: This may no longer work since we implemented connection pooling. | |
# # | |
# def delete_queue(_q) | |
# @ch_x.queue(_q).delete | |
# end | |
end | |
##### TODO / Issues ##### | |
# √ Put basic boilerplate code from tf_messaging.rb and terraform/tf_worker.rb into this library. | |
# Things like error-handling and call-reponse callback functionality. And/or... | |
# Handle rpc_call() with no block (a jsonrpc 'notification'), which | |
# doesn't expect a response. This is important for app-servers vs app-workers. | |
# √ For call-response functionality, create two BunnyWrapper methods that are closely coupled. | |
# These methods would be used to set up call and response functionality between | |
# two different types of nodes in a distributed computing scenario. | |
# | |
# # sets up a response listener and returns a 'caller' proc. | |
# r.rpc_caller(queue, reply_to:'callack_queue', thread_pool_size:1, **opts, &block) | |
# # => proc | |
# | |
# # Subscribes to queue which sends result of block to the callback-queue. | |
# r.rpc_responder(queue, reply_to:<given in caller's headers>, thread_pool_size:1, **opts, &block) | |
# √ TODO: rpc_caller() thread_pool_size:<integer> are not working, not causing multiple threads to spawn. | |
# Consider rufus-scheduler gem for timed/recuring jobs. | |
# https://rubygems.org/gems/rufus-scheduler | |
# Consider concurrent-ruby gem for better concurrency handling. | |
# https://github.com/ruby-concurrency/concurrent-ruby | |
# Update: What would be the benefits of using concurrent-ruby for BunnyWrapper? | |
# Is this item possibly irrelevant now that we use Bunny's built-in concurrency tools? | |
# Consider bypassing BunnyWrapper#publish and just use Bunny...publish for all non-user initiated publishing | |
# like callbacks and Proc publishing. Look at every place where you call 'publish' from within this library. | |
# BunnyWrapper#publish should just be for users. | |
# Update: What is the benefit of making this change? | |
# √ Change rpc_responder cb_queue to callback_queue, and make it a keyword instead of a positional param. | |
# Update: now using properties[:reply_to]. | |
# Review the keyword args vs **opts of each function definition. Keyword args keeps extra stuff out of **opts, | |
# but it's more to manage and more verbose. | |
# √ We need a generic json-rpc-publish method, so we can send one-off json-rpc messages without having | |
# to set up listeners. This would be especially helful for testing (so we can send test messages to local | |
# callback-listeners and responder listeners). | |
# √ Finish handling correlation_id for json_rpc and regular rpc call/response. | |
# √ It's working for rpc_caller/rpc_reponder, but not for json_rpc_caller/json_rpc_responder. | |
# √ Eliminate callback_headers[:callback_queue] in favor of properties[:reply_to]. | |
# Make sure content_type handling is correct. | |
# Do the above 3 things for the publish_jrpc() method. | |
# √ The synchronous rpc system works but is not threadsafe. | |
# If multiple threads make synchronous rpc calls at the same time, | |
# the results will be scrambled. | |
# Try putting rpc_caller entire block (of the resposne handler) in a mutex.synchronize again, | |
# maybe will solve multi-threading problems? | |
# Update this might be solved now by doing two things: | |
# 1. Making sure all rpc-callers have their own channel. | |
# No two threads should ever publish to rabbitmq on the same channel. | |
# 2. Using Monitor class instead of Mutex class. Monitor is re-entrant and handles recursive locking. | |
# Refactor handles all of this. | |
# √ Clean up the one-channel-per-synchronous-rpc-call mechanism. | |
# Find a way to allow multiple instances of BunnyWrapper to share a single connection. | |
# Or find a way to have a BunnyWrapper instance for each rpc-call that is made, | |
# so we can avoid resursive collisions when two or more rpc calls/responses try to | |
# access the same variables. I think this is only an issue when chaining synchronous | |
# rpc calls/responses together, which we do in JamService. | |
# Update: I think the refactor can handle all of this. | |
# Create a Logger object that automatically overrides itself with $logger, if exists. | |
# Or allow Instanciation of BunnyWrapper class to accept a logger instance. | |
# Or both. | |
# - TODO: Customize the procs that are created, so that they are regular objects (hashes?) | |
# that can be introspected and configured. They will need a 'call' method, so | |
# they can behave like procs. This feature will make it easier to debug and/or modify | |
# caller/responder proc objects. | |
# Update: I believe this is no longer relevant. | |
# | |
# TODO: Make sure SUBSCRIPTION_DEFAULTS contains proper, or at least sane, defaults | |
# for all possible Bunny#subscribe options. | |
# | |
# SEE here for good tips on managing threads/channels/queues/exchanges: | |
# https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html | |
# http://rubybunny.info/articles/concurrency.html | |
# √ TODO: Provide a way to disable listeners while still allowing callers to work. | |
# This is needed in testing where we don't want to foul up the data flow, just by booting | |
# an app in the cli (instead of as a daemon on the production server). | |
# | |
# √ TODO: Currently we cannot create a jrpc-caller without also setting up a listener. | |
# According to Bunny docs, if you create a channel without attaching a consumer, | |
# a listener will not be set up. Figure out how to implement that in this library. | |
# See 'consumer work pools' section here: http://rubybunny.info/articles/concurrency.html | |
# | |
# √ TODO: I think thread_count should be changed to listener_pool_size, or does Bunny actually use 'thread_count'? | |
# Note that we also have a channel_pool_size, managed by connection_pool gem. | |
# Update: I don't think bunny has a name for it, it just takes an integer with create_channel(). | |
# Update: Renamed in the refactor. | |
# | |
# √TODO: Currently, async rpc calls return the exchange. They should return the request ID! | |
# It should utlimately be possible to query BunnyWrapper about the state of an async job | |
# by using the request ID. | |
# √ TODO: I think BunnyWrapper may need to have 2 kinds of public 'base' methods which all others are based on. | |
# This will help us create background workers that can listen (and call), plus front-end servers that can call | |
# but NOT listen (except to syncr responses). | |
# 1. Caller | |
# 2. Listener | |
# 3. Responder (listener + caller) | |
# 4. Async Caller (with async listener) | |
# 5. Syncr Caller (with synchronous listener) | |
# Update: The refactor follows the above, mostly. | |
# | |
# I think right now, a responder can respond to either an async or syncr caller. | |
# I don't think the responder cares (or has any idea about) whether the caller | |
# is syncr or async. It just responds to the given reply_to queue. | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BunnyWrapper | |
VERSION = '0.1.0' | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source "https://rubygems.org" | |
gem 'json_rpc_object', gist:'5b128c655e79e1b4e0e10c27a5098177' | |
#gem 'connection_pool' | |
#gem 'json_rpc_object', path:'../JsonRpcRuby' | |
# Declare your gem's dependencies in foo.gemspec. | |
# Bundler will treat runtime dependencies like base dependencies, and | |
# development dependencies will be added by default to the :development group. | |
gemspec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RABBITMQ_HOST = ENV['RABBITMQ_HOST'] || 'localhost' | |
CONSUMER_POOL_SIZE = ENV['CONSUMER_POOL_SIZE'] || 16 | |
CHANNEL_POOL_SIZE = ENV['CHANNEL_POOL_SIZE'] || 32 | |
PREFETCH = ENV['PREFETCH'] || 1 | |
ENV['LOG_LEVEL'] ||= 'DEBUG' | |
require 'bundler' | |
Bundler.setup | |
require 'bunny_wrapper' | |
# require 'bunny' | |
# require 'connection_pool' | |
require 'yaml' | |
require 'json' | |
# See for examples: https://www.rabbitmq.com/tutorials/tutorial-one-ruby.html | |
# See for amqp modle explained: https://www.rabbitmq.com/tutorials/amqp-concepts.html | |
# See for queues: http://rubybunny.info/articles/queues.html | |
# See for concurrency: http://rubybunny.info/articles/concurrency.html#:~:text=are%20not%20used.-,Consumer%20Work%20Pools,ordered%20message%20processing%20by%20default. | |
# See for synchronous rpc: https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html | |
# | |
# NOTE: The rabbitmq chapter with examples of synchronous rpc does not work in | |
# multiple concurrent threads... without some modification. Mainly, the synchronize blocks | |
# need to enclose more of the code. See the sync examples below. | |
# | |
# You may also want to use a ruby Queue object to make sure the order of competing threads | |
# (publisher vs consumer) works with our code. For that, see the following: | |
# https://stackoverflow.com/questions/52068527/simple-thread-conditional-variable-example-giving-deadlock-in-ruby | |
module Bun | |
extend self | |
def connection | |
Bun.const_defined?(:Connection) && (return Connection) | |
host = (ENV['RABBITMQ_HOST']) | |
Bun.const_set('Connection', Bunny.new(hostname: host, automatic_recovery: true)) | |
puts "Bunny connected to rabbitmq host: #{host}." | |
Connection.start | |
Connection | |
end | |
def wrapper | |
@wrapper ||= BunnyWrapper.new( | |
existing_connection: connection, | |
channel_pool_size: CHANNEL_POOL_SIZE, | |
thread_pool_size: CONSUMER_POOL_SIZE, | |
prefetch: PREFETCH, | |
) | |
#@wrapper.logger.level = 1 | |
#puts "Logger object: #{@wrapper.logger.object_id}" | |
end | |
### BASIC QUEUE OPERATIONS ### | |
def setup1 | |
#connect | |
@ch1 = connection.create_channel | |
@ch2 = connection.create_channel | |
@ch3 = connection.create_channel | |
@ch1.queue('q1') | |
begin | |
#queue.subscribe(block: true) do |_delivery_info, _properties, body| | |
@ch2.queue('q1').subscribe do |_delivery_info, _properties, body| | |
puts "q1 Received '#{body}'" | |
end | |
puts "Subscribed to 'q1'." | |
rescue Interrupt => _ | |
puts "Closing connection." | |
connection.close | |
end | |
end | |
def test1(txt='Hi there!') | |
@ch1 || setup1 | |
# channel.default_exchange.publish('Hello World!', routing_key: queue.name) | |
@ch3.default_exchange.publish(txt, routing_key: 'q1') | |
3.times do |n| | |
Thread.new{instance_variable_get("@ch#{n+1}").default_exchange.publish("q1 (#{n}) text message.", routing_key: :q1)} | |
end | |
end | |
### WITH CHANNEL POOL and CONSUMER WORK POOL ### | |
attr_reader :pool | |
def setup2 | |
#connect | |
@pool = ConnectionPool::Wrapper.new(size: CHANNEL_POOL_SIZE){ connection.create_channel(nil, CONSUMER_POOL_SIZE) } | |
@pool.queue('q1') | |
begin | |
#queue.subscribe(block: true) do |_delivery_info, _properties, body| | |
@pool.queue('q1').subscribe do |_delivery_info, _properties, body| | |
sleep 1 | |
puts "q1 received '#{body}'" | |
end | |
puts "Subscribed to 'q1'." | |
rescue Interrupt => _ | |
puts "Closing connection." | |
connection.close | |
end | |
end | |
def test2(txt='Hi there!') | |
@pool || setup2 | |
# channel.default_exchange.publish('Hello World!', routing_key: queue.name) | |
@pool.default_exchange.publish(txt, routing_key: 'q1') | |
20.times do |n| | |
Thread.new{@pool.default_exchange.publish("q1 (#{n}) text message.", routing_key: :q1)} | |
end | |
end | |
### BLOCKING ### Doesn't really do what we want, and it may not be threadsafe. | |
def test3(txt="test3 synchronous Yay!") | |
#connect | |
ch = connection.create_channel | |
q = ch.queue('', exclusive:true, auto_delete:true) | |
q_name = q.name | |
ch.default_exchange.publish(txt, routing_key:q_name) | |
rslt = nil | |
sub = q.subscribe(block:true) do |_delivery_info, _properties, body| | |
puts "#{q_name} received '#{body}'" | |
ch.close | |
rslt = body | |
next body | |
end | |
rslt | |
end | |
### SYNCHRONOUS ### Note the use of Monitor and MonitorMixin. | |
attr_accessor :response, :lock, :condition | |
# This handles everything for a single synchronous send/receive | |
# rpc call through rabbitmq. | |
# | |
# It assumes there is a handler on the send-to queue that returns | |
# something to the given reply-to queue. | |
# | |
# This is as concise as the code can get (pretty much), | |
# and it does NOT USE BunnyWrapper. | |
# | |
# This example creates/destroys a dedicated channel for each rpc publish->consume | |
# operation, and it ~should~ be threadsafe. If not, consider using a ruby Queue | |
# to order the publish->consume deterministically, or try using a separate | |
# channel for publish vs consume. See BunnyWrapper for examples of each of those. | |
# | |
# NOTE: For this example, the send-to and reply-to queue is the same | |
# anonymous '' queue, so we're simply short-circuiting the remote responder. | |
# The functionality and behavior is identical to a real-world scenario where | |
# they are different queues, each with their own handler. | |
# | |
def test4(txt="Synchronous Yay!") | |
#connect | |
ch = connection.create_channel | |
q = ch.queue('', exclusive:true, auto_delete:true) | |
q_name = q.name | |
@lock = Monitor.new | |
@condition = MonitorMixin::ConditionVariable.new(@lock) | |
that = self | |
q.subscribe do |_delivery_info, _properties, payload| | |
that.lock.synchronize do | |
puts "#{q_name} received '#{payload}'" | |
that.response = payload | |
that.condition.signal | |
end | |
#that.lock.synchronize { that.condition.signal } | |
end | |
lock.synchronize do | |
ch.default_exchange.publish(txt, routing_key:q_name) | |
#lock.synchronize { condition.wait } | |
condition.wait | |
ch.close | |
end | |
return response | |
end | |
class SyncRpc | |
include Bun | |
def initialize(msg) | |
test4(msg) | |
end | |
end | |
# Runs multiple test4 calls in threads, each in an instance of SyncRpc, so we | |
# don't clash the ivars, which happens because we're not syncing those vars | |
# (I think we could though). | |
# | |
def test4a(how_many=5) | |
#connect | |
threads = how_many.times.map do |n| | |
Thread.new(n) do |i| | |
Thread.current[:output] = SyncRpc.new("test4a synced #{i}").response | |
puts "Thread #{i} done, with output #{Thread.current[:output]}." | |
end | |
end | |
puts "Thread count: #{threads.size}" | |
threads.map do |t| | |
t.join | |
t[:output] | |
end | |
end | |
### WITH BUNNYWRAPPER ### | |
#attr_reader :wrapper | |
def setup5 | |
@test5_consumer ||= wrapper.subscribe('test5') do |payload:, **params| | |
wrapper.logger.info{payload} | |
payload | |
end | |
end | |
# Runs basic BW (BunnyWrapper) publish() and subscribe() methods. | |
def test5(payload='Basic BunnyWrapper text payload.') | |
setup5 | |
wrapper.publish(payload, routing_key:'test5') | |
end | |
# Runs basic BW publish() and subscribe() methods with payload as hash, | |
# which should convert it to json. | |
def test5a(payload={a:1, b:2}) | |
setup5 | |
wrapper.publish(payload, routing_key:'test5') | |
end | |
# Runs BW publish() method with payload as a hash, | |
# which should convert to JsonRpcObject. | |
def test5b(payload={c:5, d:6}) | |
setup5 | |
#@wrapper.publish(payload, routing_key:'test6b') | |
wrapper.publish(payload, routing_key:'test5') | |
end | |
### SYNCHRONOUS BUNNYWRAPPER ### | |
# Runs BW basic synchronous subscribe-publish-receive functionality. | |
# The routing-key '__test__' causes the publisher to push to the | |
# reply-to queue, short circuiting any responders (of which there | |
# are none for this test). So this just tests the sender and receiver. | |
# | |
def test5c | |
setup5 | |
wrapper.rpc('Sync rpc call from test5c()', routing_key:'__test__').slice(:payload, :properties) | |
end | |
# Runs multiple BW synchronous calls in threads. | |
# You can pass a block to be evaluated as the 'payload', it will receive i from the iterator. | |
# | |
# See test5c for note about using teh __test__ routing key. | |
# | |
# Example usage: | |
# rpc = Bun.test5d | |
# jrpc = Bun.test5d(25){|i|{iterator:i, self:self}} | |
# | |
def test5d(how_many=5, &block) | |
#wrapper || setup5 | |
threads = how_many.times.map do |n| | |
t = Thread.new(n) do |i| | |
puts "Thread #{n} beginning" | |
Thread.current[:output] = wrapper.rpc((block_given? ? block.call(i) : "test5d #{i}"), routing_key:'__test__').slice(:payload, :properties) | |
puts "Thread #{n} done" | |
#wrapper.logger.info{ "Thread #{i} done, with output #{Thread.current[:output]}." } | |
end | |
#t.join | |
t | |
end | |
wrapper.logger.info{ "Thread count: #{threads.size}" } | |
threads.map do |t| | |
t.join | |
t[:output] | |
end | |
end | |
# Runs test5d with hash input | |
def test5e | |
test5d(5){|i|{iterator:i, self:self}} | |
end | |
### WITH RESPONDER ### | |
attr_reader :responders, :test6_consumer | |
# Consumer to receive responses. | |
def setup6_consumer | |
@test6_consumer ||= wrapper.subscribe('test6_consumer') do |*params, delivery_info:, properties:, payload:, **options| | |
#puts params.inspect | |
puts '' | |
puts "TEST6 Consumer received response from publishing to responder:" | |
#puts({params:, properties:, payload:, options:, delivery_info:}.to_yaml) | |
puts({params:, properties:, payload:, options:}.to_yaml) | |
end | |
end | |
# Sends message to consumer, to show it's working. | |
def test6(txt="Hey test6") | |
setup6_consumer | |
wrapper.publish({text:txt}, routing_key:'test6_consumer') | |
end | |
# Sets up a responder with NO block. | |
def setup6a | |
setup6_consumer | |
responder('test6a_responder') | |
end | |
# Sets up a responder with a block. | |
def setup6b | |
setup6_consumer | |
responder('test6b_responder') do |payload:, properties:, delivery_info:, **extra_opts_and_prms| | |
puts "setup6b responder inner block running" # with payload: #{payload}" | |
# Result of this is sent (as payload, by BW) to the reply_to queue. | |
if payload.is_a?(JsonRpcObject) | |
payload.response('setup6b jrpc result text') | |
elsif payload.is_a?(Hash) | |
payload.merge({note:'This hash is just coppied from the test6_ request'}) | |
else | |
"This is the setup6b rpc text response, and here is the original payload: #{payload.chomp.strip} ." | |
end | |
end | |
end | |
# Publish text to responder-without-block. | |
def test6a(payload="Hi from test6a! Full async rpc") | |
setup6a | |
# Publishes to the responder. | |
wrapper.publish(payload, routing_key:'test6a_responder', reply_to:'test6_consumer') | |
end | |
# Publish text to responder-with-block. | |
def test6b(payload="Hi from test6b! Full async rpc") | |
setup6b | |
# Publishes to the responder. | |
wrapper.publish(payload, routing_key:'test6b_responder', reply_to:'test6_consumer') | |
end | |
# Publish hash to responder-with-block. | |
def test6c | |
test6b({a:1, b:2}) | |
end | |
# Publish JsonRpcObject to responder-with-block. | |
def test6d | |
test6b(JsonRpcObject.request('do-my-thing', {c:3, d:4})) | |
end | |
# Publish synchronous-rpc with text to responder-with-block | |
def test6e(payload="Test6 rpc to responder--with-block") | |
setup6b | |
wrapper.rpc(payload, routing_key:'test6b_responder').slice(:payload, :properties) | |
end | |
# Publish synchronous-rpc with hash to responder-with-block | |
def test6f | |
test6e({x:'one', y:'two'}) | |
end | |
# Publish synchronous-rpc with jrpc to responder-with-block | |
def test6g(_method='test-method', payload={m:'Mmmm', n:'Good!'}) | |
setup6b | |
wrapper.rpc(_method, payload, routing_key:'test6b_responder').slice(:payload, :properties) | |
end | |
# Sets up a native BunnyWrapper responder with a block. | |
def setup7 | |
setup6_consumer | |
wrapper.responders('test7_responder') do |payload:, properties:, delivery_info:, **extra_opts_and_prms| | |
puts "setup7 responder inner block running" # with payload #{payload}" | |
# Result of this is sent (as payload, by BW) to the reply_to queue. | |
if payload.is_a?(JsonRpcObject) | |
payload.response('setup7 responder result text') | |
elsif payload.is_a?(Hash) | |
payload.merge({note:'This hash is just coppied from the test7_ request'}) | |
else | |
"This is the setup7 rpc text response, and here is the original test7 payload: #{payload.chomp.strip} ." | |
end | |
end | |
end # setup7 | |
# Tests syncr rpc call to native BunnyWrapper responder. | |
def test7(_method='test-method', payload={ggg:"They're", great:'Awesome!'}) | |
setup7 | |
wrapper.rpc(_method, payload, routing_key:'test7_responder').slice(:payload, :properties) | |
end | |
# Test currency between long-running responder and one-off rpc() calls. | |
# | |
def setup8 | |
setup7 | |
wrapper.responders('test8_responder', manual_ack:true) do |payload:, properties:, delivery_info:, **extra_opts_and_prms| | |
puts "setup8 responder inner block running" # with payload #{payload}" | |
# Result of this is sent (as payload, by BW) to the reply_to queue. | |
sleep 5 | |
output = if payload.is_a?(JsonRpcObject) | |
payload.response('setup8 jrpc result text') | |
elsif payload.is_a?(Hash) | |
payload.merge({note:'This hash is just coppied from the test8_ request'}) | |
else | |
"This is the setup8 rpc text response, and here is the original test8 payload: #{payload.chomp.strip} ." | |
end | |
delivery_info[:channel].ack(delivery_info[:delivery_tag]) | |
output | |
end | |
end # setup8 | |
# Tests syncr rpc call to native BunnyWrapper responder. | |
def test8(_method='test-method', payload={green:"eggs", with:'ham!'}) | |
setup8 | |
results = [] | |
n=0 | |
5.times do |n| | |
#Thread.new do | |
results << wrapper.publish(_method, "test8 publishing to test8_responder", routing_key:"test8_responder") | |
#end | |
end | |
results << wrapper.rpc(_method, payload, routing_key:'test7_responder').slice(:payload, :properties) | |
puts "Wait for it..." | |
return results | |
end | |
def run_all | |
results = { | |
test1:, | |
test2:, | |
test3:, | |
test4:, | |
test4a:, | |
test5:, | |
test5a:, | |
test5b:, | |
test5c:, | |
test5d:, | |
test5e:, | |
test6:, | |
test6a:, | |
test6b:, | |
test6c:, | |
test6d:, | |
test6e:, | |
test6f:, | |
test6g:, | |
test7:, | |
test8:, | |
} | |
end | |
### EXIT POLITELY ### | |
def exit(*args) | |
connection.close | |
super | |
end | |
### GENERIC RESPONDER PROTOTYPE ### | |
# This was used as the base of the current BunnyWrapper#responder() method. | |
# I think this is still used by one the tests above, but otherwise we should | |
# be using the BunnyWrapper responder! | |
def responder(listen_queue, reply_to:nil, &block) | |
@responders ||= {} | |
@responders[listen_queue] ||= wrapper.subscribe(listen_queue) do |*params, delivery_info:, properties:, payload:, **options| | |
# The extra *params and **options above will capture any extra args or kwargs. | |
#puts properties.inspect | |
_reply_to ||= (options[:reply_to] || properties[:reply_to] || _reply_to) | |
#puts "rpc_responder reply_to: #{reply_to}" | |
callback_headers = { | |
original_payload: payload, | |
original_properties: properties.to_h, | |
} | |
out_params = { | |
headers: callback_headers.to_h, #.merge({'exit_code'=>0}), | |
content_type: properties[:content_type], | |
correlation_id: (properties[:message_id] || properties[:correlation_id]), | |
} | |
block_call_params = { | |
params:, #params.dup, | |
delivery_info:, # delivery_info.to_h, | |
properties:, #properties.to_h, | |
payload:, #payload.dup, | |
options:, #options.to_h, | |
} | |
if block_given? | |
rslt = block.call(**block_call_params) | |
else | |
rslt = {error:"BunnyWrapper responder had no block to call, but here are the params to pass:", | |
block_call_params: block_call_params.except(:delivery_info) | |
} | |
end | |
puts "Responder '#{listen_queue}' publishing result:" | |
puts rslt.to_yaml | |
wrapper.publish(rslt, routing_key:_reply_to, **out_params) | |
end | |
end # responder | |
end # Bun |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment