Skip to content

Instantly share code, notes, and snippets.

@yuvalif
Last active September 7, 2023 15:44
Show Gist options
  • Save yuvalif/e58e264bafe847bc5196f95be0e704a2 to your computer and use it in GitHub Desktop.
Save yuvalif/e58e264bafe847bc5196f95be0e704a2 to your computer and use it in GitHub Desktop.

Setup

  • start the cluster
ON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d
  • create a persistent amqp topic:
aws --region=default --endpoint-url http://localhost:8000 sns create-topic --name=fishtopic \
  --attributes='{"push-endpoint": "amqp://localhost", "amqp-exchange": "ex1", "persistent": "true"}'
  • creaet a bucket:
aws --endpoint-url http://localhost:8000 s3 mb s3://fish
  • create a notification:
aws --region=default --endpoint-url http://localhost:8000 s3api put-bucket-notification-configuration --bucket fish \
  --notification-configuration='{"TopicConfigurations": [{"Id": "notif1", "TopicArn": "arn:aws:sns:default::fishtopic", "Events": []}]}'
  • start a python amqp topic receiver
    • this requires the pika package: sudo pip install pika
    • and the following python program:
import pika, sys, os, time

def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='ex1', exchange_type='topic', durable='true')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='ex1', queue=queue_name, routing_key='fishtopic1')

    def callback(ch, method, properties, body):
        print(" [x] %r" % body.decode())

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    while True:
        try:
            main()
        except KeyboardInterrupt:
            print('Interrupted')
            try:
                sys.exit(0)
            except SystemExit:
                os._exit(0)
        except Exception as e:
                print("consumer stopped with error", e)
                time.sleep(0.1)
  • upload a file: aws --endpoint-url http://localhost:8000 s3 cp myfile s3://fish and make sure that the receiver gets the notifications

Simple Reconnect

  • stop the broker: sudo systemctl stop rabbitmq-server
  • try to upload another file, then restart

Reconnect after Slowdown

  • we use hsbench to be able to fill the persistent topic queue.
  • first we need to create the bucket:
hsbench -a 0555b35654ad1656d804 -s h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q== \ 
  -u http://localhost:8000 -bp bk -m i
  • then, create the notification on the bucket (reuse the same topic from before):
aws --region=default --endpoint-url http://localhost:8000 s3api put-bucket-notification-configuration \
  --bucket bk000000000000 --notification-configuration='{"TopicConfigurations": [{"Id": "notif1", "TopicArn": "arn:aws:sns:default::fishtopic", "Events": []}]}'
  • and last, put and delete objects (when the broker is down):
hsbench -a 0555b35654ad1656d804 -s h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q== \ 
  -u http://localhost:8000 -bp bk -m pd -t $(nproc) -z 1K -l 100
  • once we see "slow down" errors, we bring the broker up again
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment