Last active
October 29, 2016 01:59
-
-
Save rajeshl/ec9650dc53f106bd44c23b2b6b5418d5 to your computer and use it in GitHub Desktop.
A simple tool to migrate messages from a given AMQP Q to a different Q, with throttling. Shovel works great for bulk copy. This tool is useful when there is a need to control the speed at which migration happens.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pika | |
import sys | |
import signal | |
import time | |
import datetime | |
import argparse | |
#TODO cleanup at signal handler | |
def signal_handler(signum, frame): | |
log("Received signal. Cleaning up..") | |
log("Over and out!") | |
exit() | |
def log(message): | |
print (datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S")) + " " + message | |
def setup_connection(mq_url, mq_vhost, batch_size): | |
if mq_vhost == '/': | |
mq_vhost = "%2F" | |
amq_url = mq_url + '/' + mq_vhost | |
log("Setting up AMQ connection. url: " + amq_url) | |
parameters = pika.URLParameters(amq_url) | |
try: | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
channel.basic_qos(prefetch_count=batch_size) | |
except (pika.exceptions.ConnectionClosed, pika.exceptions.ProbableAuthenticationError) as e: | |
log("Error connecting - " + amq_url) | |
return None, None | |
return connection, channel | |
#TODO use two channels? | |
def migrate_messages(channel, mq_src_queue_name, mq_dst_exchange, mq_routing_key, | |
throttle_interval, batch_size, max_msg): | |
messages_migrated = 0 | |
ts = datetime.datetime.now().strftime("%Y-%b-%d-%H-%M-%S") | |
file_handle = open("migrated_messages-" + ts + ".dat", 'w') | |
try: | |
for method_frame, properties, body in channel.consume(mq_src_queue_name): | |
if mq_routing_key is None: | |
mq_routing_key = method_frame.routing_key | |
channel.basic_publish(exchange=mq_dst_exchange, | |
routing_key=mq_routing_key, | |
body=body) | |
file_handle.write(body) | |
file_handle.write("\n") | |
channel.basic_ack(method_frame.delivery_tag) | |
messages_migrated += 1 | |
if messages_migrated % batch_size == 0: | |
log("Migrated " + str(messages_migrated) + " messages.") | |
if messages_migrated >= max_msg: | |
break | |
if messages_migrated % batch_size == 0: | |
time.sleep( throttle_interval ) | |
except pika.exceptions.ChannelClosed: #TODO - catch invalid exchange exception | |
log("Unable to migrate. Check exchange and queue exist before attempting migration!") | |
file_handle.close() | |
return messages_migrated | |
def setup_arg_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-u", "--url", default="amqp://guest:guest@localhost", | |
help="AMQP url of the form amqp://<user>:<password>@<host>[:<port>]") | |
parser.add_argument("-v", "--vhost", default='/', help="AMQP host. example: /") | |
parser.add_argument("-q", "--in_queue", required=True, | |
help="Q from where to pull the messages.") | |
parser.add_argument("-x", "--out_exchange", required=True, | |
help="Exchange to push the messages.") | |
parser.add_argument("-k", "--routing_key", | |
help="AMQP routing key. By default the key from original message will be used.") | |
parser.add_argument("-t", "--throttle_interval", default=1, | |
help="Throttle interval in seconds. Default is 1sec.") | |
parser.add_argument("-b", "--batch_size", default=5, | |
help="Message batch size. Default is 5.") | |
parser.add_argument("-m", "--max_messages", default=25, | |
help="Total number of messages to be migrated. Default is 25.") | |
return parser | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
parser = setup_arg_parser() | |
args = parser.parse_args() | |
connection, channel = setup_connection(args.url, args.vhost, int(args.batch_size)) | |
if connection is None: | |
exit() | |
log("Starting migration...") | |
log("From: " + args.in_queue) | |
log("To: " + args.out_exchange + "[" + str(args.routing_key) + "]") | |
log("Throttling at " + str(int(args.batch_size)/int(args.throttle_interval)) + "mps") | |
start_time = time.time() | |
messages_migrated = migrate_messages(channel, args.in_queue, args.out_exchange, | |
args.routing_key, int(args.throttle_interval), int(args.batch_size), int(args.max_messages)) | |
try: | |
channel.close() | |
connection.close() | |
except pika.exceptions.ChannelClosed: | |
exit() #ignore as this could happen only because of invalid params | |
log("Done!") | |
end_time = time.time() | |
log("Migrated " + str(messages_migrated) + " messages in " + str(end_time - start_time) + " seconds.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment