Last active
December 20, 2015 20:29
-
-
Save stuart/6190685 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
!#/usr/env ruby | |
require 'json' | |
require 'uri' | |
# Removes repeated messages from one of the AMQP queues. | |
# | |
# USAGE: ruby queue_uniq.rb "queue_name" | |
# | |
# This will dump the whole queue to a temp file. (In case anything goes wrong) | |
# it will strip out any messages with the duplicated payloads then | |
# write the cleaned file to a temp file. | |
# The cleaned file is then gone through and requeued | |
# | |
# TODO: use HTTP library instead of calls to curl. | |
class QueueUniq | |
attr_accessor :queue | |
def initialize queue | |
@queue = queue | |
end | |
def get_queue_length | |
q = JSON.parse `#{curl "/api/queues/%2f/#{queue}"}` | |
q["messages_ready"].to_i | |
end | |
def dump | |
options = {count: get_queue_length, requeue: false, encoding: "auto"}.to_json | |
command = curl "/api/queues/%2f/#{queue}/get", "'#{options}'", "POST" | |
`#{command} > #{dump_file}` # Let the shell do the thing that the shell does. | |
end | |
def requeue | |
File.open(clean_file) do |file| | |
data = JSON.parse file.read | |
data.each do |record| | |
options = record.to_json | |
command = curl "/api/exchanges/%2f/town_crier/publish", "'#{options}'", "POST" | |
`#{command}` | |
end | |
end | |
end | |
def clean_data | |
cleaned_data = [] | |
File.open(dump_file) do |file| | |
data = JSON.parse(file.read) | |
cleaned_data = data.uniq do |record| | |
record["payload"] | |
end | |
end | |
File.open(clean_file, "w") do |file| | |
file.write cleaned_data.to_json | |
end | |
end | |
def clean | |
dump | |
clean_data | |
requeue | |
end | |
def dump_file | |
"/tmp/#{queue}.dump.json" | |
end | |
def clean_file | |
"#{dump_file}.clean" | |
end | |
def curl path, data = '{}', method = 'GET', host = host, port = port | |
uri = URI::HTTP.build(host: host, port: port, path: path) | |
c = "curl -u #{user}:#{password} -X #{method} '#{uri.to_s}' -d #{data}" | |
puts c | |
c | |
end | |
def user | |
ENV["AMQP_USER"] | |
end | |
def password | |
ENV["AMQP_PASSWORD"] | |
end | |
def host | |
ENV["AMQP_HOST"] | |
end | |
def port | |
ENV["AMQP_PORT"] || 15672 | |
end | |
end | |
def usage | |
puts "USAGE: ruby queue_uniq.rb 'queue_name' | |
puts "Make sure the environment variables for AMQP_USER, AMQP_PASSWORD and AMQP_HOST are set." | |
exit(0) | |
end | |
usage if ARGV.length < 2 | |
cleaner = QueueUniq.new ARGV[0] | |
usage if cleaner.user.nil? || cleaner.host.nil? || cleaner.password.nil? | |
cleaner.clean |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment