Created
July 18, 2020 19:40
-
-
Save gmr/535c68a72b0338b3c4dd1832403422b1 to your computer and use it in GitHub Desktop.
Python CLI application that converts all quorum queues on a RabbitMQ cluster to classic queues with the intent of not loosing any messages or impacting production workloads (aside from disconnecting consumers on a queue)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import logging | |
import sys | |
import time | |
import typing | |
from urllib import parse | |
import httpx | |
LOGGER = logging.getLogger(__name__) | |
class Converter: | |
def __init__(self, url: str, vhost: str) -> None: | |
self.base_url = url.rstrip('/') | |
self.vhost = parse.quote(vhost, '') | |
response = httpx.get('{}/api/queues/{}'.format( | |
self.base_url, self.vhost)) | |
self.queues = sorted((queue for queue in response.json() | |
if queue['type'] == 'quorum' | |
and not queue['auto_delete']), | |
key=lambda queue: queue['name']) | |
def run(self) -> None: | |
for queue in self.queues: | |
LOGGER.info('Processing %s', queue['name']) | |
temp_queue = '{}-temp'.format(queue['name']) | |
if not self.create_queue( | |
temp_queue, | |
self._strip_x_queue_type(queue['arguments']), | |
durable=queue['durable'], | |
exclusive=queue['exclusive']): | |
LOGGER.error('Failed to create temporary queue: %s', | |
temp_queue) | |
sys.exit(1) | |
self.switch_bindings(queue['name'], temp_queue) | |
self.wait_for_queue_to_drain(queue['name']) | |
if not self.delete_queue(queue['name']): | |
LOGGER.error('Failed to delete queue: %s', queue['name']) | |
sys.exit(1) | |
if not self.create_queue( | |
queue['name'], | |
self._strip_x_queue_type(queue['arguments']), | |
durable=queue['durable'], | |
exclusive=queue['exclusive']): | |
LOGGER.error('Failed to create queue: %s', queue['name']) | |
sys.exit(1) | |
self.switch_bindings(temp_queue, queue['name']) | |
if not self.create_temp_shovel(temp_queue, queue['name']): | |
LOGGER.error('Failed to create temp shovel from %s to %s', | |
temp_queue, queue['name']) | |
sys.exit(1) | |
self.wait_for_queue_to_drain(temp_queue) | |
if not self.delete_queue(temp_queue): | |
LOGGER.error('Failed to delete queue: %s', temp_queue) | |
sys.exit(1) | |
def bind_queue(self, | |
exchange: str, | |
routing_key: str, | |
queue: str, | |
arguments: dict) -> bool: | |
url = '{}/api/bindings/{}/e/{}/q/{}'.format( | |
self.base_url, self.vhost, exchange, queue) | |
response = httpx.post( | |
url, headers={'Content-Type': 'application/json'}, | |
data=json.dumps({'routing_key': routing_key, | |
'arguments': arguments})) | |
return response.status_code == 201 | |
def create_queue(self, | |
name: str, | |
arguments: dict, | |
durable: bool, | |
exclusive: bool) -> bool: | |
response = httpx.put(self._api_url_queue(name), | |
headers={'Content-Type': 'application/json'}, | |
data=json.dumps({ | |
'arguments': arguments, | |
'auto_delete': False, | |
'durable': durable, | |
'exclusive': exclusive, | |
'type': 'classic'})) | |
LOGGER.debug('Response: %r', response) | |
return response.status_code == 201 | |
def create_temp_shovel(self, from_queue: str, to_queue: str) -> bool: | |
shovel_name = '{}-to-{}'.format(from_queue, to_queue) | |
response = httpx.put( | |
'{}/api/parameters/shovel/{}/{}'.format( | |
self.base_url, self.vhost, shovel_name), | |
headers={'Content-Type': 'application/json'}, | |
data=json.dumps({ | |
'component': 'shovel', | |
'name': shovel_name, | |
'value': { | |
'ack-mode': 'on-confirm', | |
'add-forward-headers': False, | |
'delete-after': 'queue-length', | |
'dest-queue': to_queue, | |
'dest-uri': 'amqp://', | |
'prefetch-count': 100, | |
'reconnect-delay': 30, | |
'src-queue': from_queue, | |
'src-uri': 'amqp://' | |
}, | |
'vhost': self.vhost})) | |
if response.status_code != 201: | |
LOGGER.error('Response: %r: %r', response, response.json()) | |
return response.status_code == 201 | |
def delete_queue(self, name: str) -> bool: | |
response = httpx.delete(self._api_url_queue(name)) | |
LOGGER.debug('Response: %r', response) | |
return response.status_code == 204 | |
def get_queue_bindings(self, name: str) -> typing.List[typing.Dict]: | |
response = httpx.get( | |
'{}/api/queues/{}/{}/bindings'.format( | |
self.base_url, self.vhost, name)) | |
return [binding for binding in response.json() | |
if binding['destination'] != binding['properties_key'] | |
and binding['destination'] != binding['routing_key'] | |
and binding['source'] != ''] | |
def switch_bindings(self, from_queue: str, to_queue: str) -> None: | |
for binding in self.get_queue_bindings(from_queue): | |
if not self.bind_queue( | |
binding['source'], | |
binding['routing_key'], | |
to_queue, | |
binding['arguments']): | |
LOGGER.error('Failed to bind queue %s to %s with %s', | |
to_queue, binding['source'], | |
binding['routing_key']) | |
sys.exit(1) | |
if not self.unbind_queue( | |
binding['source'], | |
from_queue, | |
binding['properties_key']): | |
LOGGER.error('Failed to unbind queue %s to %s with %s', | |
from_queue, binding['source'], | |
binding['routing_key']) | |
sys.exit(1) | |
def unbind_queue(self, | |
exchange: str, | |
queue: str, | |
properties_key: str) -> bool: | |
url = '{}/api/bindings/{}/e/{}/q/{}/{}'.format( | |
self.base_url, self.vhost, exchange, queue, properties_key) | |
response = httpx.delete(url) | |
return response.status_code == 204 | |
def wait_for_queue_to_drain(self, name: str) -> None: | |
while True: | |
response = httpx.get(self._api_url_queue(name)) | |
if not response.status_code == 200: | |
LOGGER.error('Failed to get queue details', name) | |
sys.exit(1) | |
body = response.json() | |
if body['messages'] == 0: | |
return | |
LOGGER.info('Queue %s has %i messages', name, body['messages']) | |
time.sleep(5) | |
def _api_url_queue(self, name: str) -> str: | |
return '{}/api/queues/{}/{}'.format(self.base_url, self.vhost, name) | |
@staticmethod | |
def _strip_x_queue_type(args: dict) -> dict: | |
if 'x-queue-type' in args: | |
del args['x-queue-type'] | |
return args | |
def main() -> None: | |
parser = argparse.ArgumentParser( | |
description='Convert Quorum queues to Classic queues', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('--vhost', default='/') | |
parser.add_argument( | |
'url', metavar='URL', nargs='?', | |
default='http://guest:guest@localhost:15672', | |
help='The Base URL to the RabbitMQ Management UI, ' | |
'including credentials') | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.INFO) | |
Converter(args.url, args.vhost).run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment