Skip to content

Instantly share code, notes, and snippets.

@LuisCusihuaman
Last active January 6, 2022 15:26
Show Gist options
  • Save LuisCusihuaman/f6aa2d1146d787cb6189bcd6b5ea6856 to your computer and use it in GitHub Desktop.
Save LuisCusihuaman/f6aa2d1146d787cb6189bcd6b5ea6856 to your computer and use it in GitHub Desktop.
RABBITMQ CLIENT CONNECTION PYTHON TEST
import pika
from bottle import run, Bottle
EXCHANGE_NAME = "domain"
QUEUE_NAME = "domain"
ROUTING_EX_KEY = "domain-key"
def main():
rabbitmq_uri_app = "amqp://user:[email protected]:5672/target-vhost"
connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_uri_app))
app = Bottle()
ch = connection.channel()
@app.post('/message/<your_message>')
def hello(your_message: str):
msg_to_rabbitmq = bytes(your_message, encoding='utf8')
ch.basic_publish(EXCHANGE_NAME, routing_key=ROUTING_EX_KEY, body=msg_to_rabbitmq)
return your_message
@app.get('/messages')
def get_last_message():
_, _, body = ch.basic_get(queue=QUEUE_NAME, auto_ack=True)
return body.decode('utf-8') if body else "La queue esta vacia"
run(app, host='0.0.0.0', port=5000, debug=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment