Skip to content

Instantly share code, notes, and snippets.

@jakubczaplicki
Last active September 8, 2017 07:40
Show Gist options
  • Save jakubczaplicki/8993755aa70a73c506c07a05c6f65da1 to your computer and use it in GitHub Desktop.
Save jakubczaplicki/8993755aa70a73c506c07a05c6f65da1 to your computer and use it in GitHub Desktop.
import asyncio
import sys
from aio_pika import connect, Message
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.getLogger("pika.heartbeat").setLevel(logging.CRITICAL)
logging.getLogger("pika.callback").setLevel(logging.CRITICAL)
logging.getLogger("pika.channel").setLevel(logging.DEBUG)
logging.getLogger("aio_pika.exchange").setLevel(logging.CRITICAL)
async def setup(loop=None):
url = 'amqp://<user>:<pswd>@localhost:5672/<virtualhost>'
exchange_name = 'my_exchange'
queue_name = 'queue_in'
routing_key = 'my_routing_key'
connection = await connect(url, loop=loop)
channel = await connection.channel()
exchange = await channel.declare_exchange(exchange_name)
await channel.queue_delete(queue_in_name)
queue = await channel.declare_queue(queue_name)
await queue.purge()
await queue.bind(exchange, routing_key)
# let's try to publish a message BEFORE queue.get
# msg = Message(bytes('Hello World: {!r}'.format(-1), 'utf-8'))
# await exchange.publish(msg, routing_key)
return exchange, routing_key, queue
async def publish_data(exchange, routing_key_consumer):
for n in range(0, 5):
msg = Message(bytes('Hello World: {!r}'.format(n), 'utf-8'))
print("Publish {}".format(msg.body))
await exchange.publish(msg, routing_key_consumer)
await asyncio.sleep(0.5)
async def consumer_consume(queue):
def on_message_callback(message):
print('Received (consume callback): {}'.format(message.body))
queue.consume(on_message_callback, exclusive=True)
async def consumer_get(queue):
while True:
try:
message = await queue.get()
print('Received (get): {}'.format(message.body))
except TimeoutError:
print('TimeoutError')
def main(args=sys.argv[1:]):
print("starting..")
loop = asyncio.get_event_loop()
setup_coro = setup(loop=loop)
exchange, routing_key, queue = loop.run_until_complete(setup_coro)
if args[0] == 'consume':
print('Using queue.consume')
loop.create_task(consumer_consume(queue))
else:
print('Using queue.get')
loop.create_task(consumer_get(queue))
loop.run_until_complete(publish_data(exchange, routing_key))
loop.stop()
loop.close()
print('done')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment