Skip to content

Instantly share code, notes, and snippets.

@baelter
Last active April 12, 2023 08:12
Show Gist options
  • Save baelter/76776bceaa5d8d860ce559837730ca34 to your computer and use it in GitHub Desktop.
Save baelter/76776bceaa5d8d860ce559837730ca34 to your computer and use it in GitHub Desktop.
Finds broken bindings, possible bug in RabbitMQ
#!/usr/bin/env ruby
require "excon"
require "optparse"
require "json"
# Finds broken bindings, possibly bug in RabbitMQ.
# Check if any unrouted message was published to an exchange that should have routed it.
banner = "Usage: validate_bindings.rb username password hostname"
OptionParser.new do |opts|
opts.banner = banner
opts.on("-h", "--help", "Prints this help") do
puts opts
exit
end
end.parse!
hostname = ARGV.pop
password = ARGV.pop
username = ARGV.pop
raise banner unless hostname && password && username
class BindingsValidator
def initialize(hostname, username, password)
@hostname = hostname
@username = username
@password = password
end
def for_each_vhost(&block)
resp = api.get(path: "/api/vhosts?columns=name")
vhosts = JSON.parse(resp.body, symbolize_names: true).map { |vhost| vhost[:name] }
vhosts.each(&block)
end
def broken_binding(message, bindings)
bindings.find do |binding|
binding[:source] == message[:exchange] &&
amqp_match?(message[:routing_key], binding[:routing_key])
end
end
def bindings(vhost)
resp = api.get(path: "/api/bindings/#{encode(vhost)}")
JSON.parse(resp.body, symbolize_names: true)
end
def unroutable_mesages(vhost)
resp = api.post(path: "/api/queues/#{encode(vhost)}/unroutables/get",
body: {
count: 1000,
ackmode: "ack_requeue_true",
encoding: "auto",
truncate: 0
}.to_json)
JSON.parse(resp.body, symbolize_names: true)
end
def amqp_match?(key, pattern)
return true if key == pattern
replaced = pattern.gsub('*', '([^.]+)').gsub('#', '([^.]+.?)+')
regex_string = /^#{replaced}$/
regex_string.match?(key)
end
def api(**options)
return @api if @api
excon_options = { expects: [200, 201, 204], read_timeout: 15, write_timeout: 5,
connect_timeout: 5, persistent: true, debug_response: true }
excon_options.merge!(options)
@api = Excon.new("https://#{@username}:#{@password}@#{@hostname}", **excon_options)
end
def encode(str)
URI.encode_www_form_component(str)
end
end
validator = BindingsValidator.new(hostname, username, password)
errors = {}
message_count = 0
validator.for_each_vhost do |vhost|
next if vhost == "/"
bindings = validator.bindings(vhost)
messages = validator.unroutable_mesages(vhost)
message_count += messages.size
messages.each do |message|
next if errors[message[:routing_key]]
binding = validator.broken_binding(message, bindings)
next unless binding
errors[message[:routing_key]] = binding
end
rescue => e
puts "Cloud not validate vhost=#{vhost} #{e.full_message}"
end
puts "Analyzed #{message_count} unrouted messages"
puts "Found #{errors.size} routing keys that should have been routed"
puts errors.to_json if errors.any?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment