Last active
April 12, 2023 08:12
-
-
Save baelter/76776bceaa5d8d860ce559837730ca34 to your computer and use it in GitHub Desktop.
Finds broken bindings, possible bug in RabbitMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require "excon" | |
require "optparse" | |
require "json" | |
# Finds broken bindings, possibly bug in RabbitMQ. | |
# Check if any unrouted message was published to an exchange that should have routed it. | |
banner = "Usage: validate_bindings.rb username password hostname" | |
OptionParser.new do |opts| | |
opts.banner = banner | |
opts.on("-h", "--help", "Prints this help") do | |
puts opts | |
exit | |
end | |
end.parse! | |
hostname = ARGV.pop | |
password = ARGV.pop | |
username = ARGV.pop | |
raise banner unless hostname && password && username | |
class BindingsValidator | |
def initialize(hostname, username, password) | |
@hostname = hostname | |
@username = username | |
@password = password | |
end | |
def for_each_vhost(&block) | |
resp = api.get(path: "/api/vhosts?columns=name") | |
vhosts = JSON.parse(resp.body, symbolize_names: true).map { |vhost| vhost[:name] } | |
vhosts.each(&block) | |
end | |
def broken_binding(message, bindings) | |
bindings.find do |binding| | |
binding[:source] == message[:exchange] && | |
amqp_match?(message[:routing_key], binding[:routing_key]) | |
end | |
end | |
def bindings(vhost) | |
resp = api.get(path: "/api/bindings/#{encode(vhost)}") | |
JSON.parse(resp.body, symbolize_names: true) | |
end | |
def unroutable_mesages(vhost) | |
resp = api.post(path: "/api/queues/#{encode(vhost)}/unroutables/get", | |
body: { | |
count: 1000, | |
ackmode: "ack_requeue_true", | |
encoding: "auto", | |
truncate: 0 | |
}.to_json) | |
JSON.parse(resp.body, symbolize_names: true) | |
end | |
def amqp_match?(key, pattern) | |
return true if key == pattern | |
replaced = pattern.gsub('*', '([^.]+)').gsub('#', '([^.]+.?)+') | |
regex_string = /^#{replaced}$/ | |
regex_string.match?(key) | |
end | |
def api(**options) | |
return @api if @api | |
excon_options = { expects: [200, 201, 204], read_timeout: 15, write_timeout: 5, | |
connect_timeout: 5, persistent: true, debug_response: true } | |
excon_options.merge!(options) | |
@api = Excon.new("https://#{@username}:#{@password}@#{@hostname}", **excon_options) | |
end | |
def encode(str) | |
URI.encode_www_form_component(str) | |
end | |
end | |
validator = BindingsValidator.new(hostname, username, password) | |
errors = {} | |
message_count = 0 | |
validator.for_each_vhost do |vhost| | |
next if vhost == "/" | |
bindings = validator.bindings(vhost) | |
messages = validator.unroutable_mesages(vhost) | |
message_count += messages.size | |
messages.each do |message| | |
next if errors[message[:routing_key]] | |
binding = validator.broken_binding(message, bindings) | |
next unless binding | |
errors[message[:routing_key]] = binding | |
end | |
rescue => e | |
puts "Cloud not validate vhost=#{vhost} #{e.full_message}" | |
end | |
puts "Analyzed #{message_count} unrouted messages" | |
puts "Found #{errors.size} routing keys that should have been routed" | |
puts errors.to_json if errors.any? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment