Skip to content

Instantly share code, notes, and snippets.

@lemenkov
Created June 23, 2018 06:23
Show Gist options
  • Save lemenkov/0b6e8b9488280e07b939edc5bd21522f to your computer and use it in GitHub Desktop.
Save lemenkov/0b6e8b9488280e07b939edc5bd21522f to your computer and use it in GitHub Desktop.
Test MTU in RabbitMQ
#!/usr/bin/env python
import amqp
import random
import string
if __name__ == '__main__':
Conn = amqp.Connection(host='localhost:5672', userid='guest', password='guest')
Chan = Conn.channel()
QueueName = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
ExchangeName = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
Chan.queue_declare(QueueName)
Chan.exchange_declare(ExchangeName, 'fanout')
Chan.queue_bind(QueueName, ExchangeName, '')
x = 1
while x < 9000+1:
Payload = "x" * x
Chan.basic_publish(amqp.Message(Payload), ExchangeName, '')
M = Chan.basic_get(QueueName)
Chan.basic_ack(M.delivery_tag)
print("MTU ", x, " received ", M.body, "[", M.body == Payload, "]")
x += 1
# purge any remaining messages (shouldn't happen)
Chan.queue_purge(QueueName)
# Cleanup everything
Chan.queue_unbind(QueueName, ExchangeName, '')
Chan.exchange_delete(ExchangeName)
Chan.queue_delete(QueueName)
Chan.close()
Conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment