Last active
September 8, 2017 07:40
-
-
Save jakubczaplicki/8993755aa70a73c506c07a05c6f65da1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from aio_pika import connect, Message | |
import logging | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
logging.getLogger("pika.heartbeat").setLevel(logging.CRITICAL) | |
logging.getLogger("pika.callback").setLevel(logging.CRITICAL) | |
logging.getLogger("pika.channel").setLevel(logging.DEBUG) | |
logging.getLogger("aio_pika.exchange").setLevel(logging.CRITICAL) | |
async def setup(loop=None): | |
url = 'amqp://<user>:<pswd>@localhost:5672/<virtualhost>' | |
exchange_name = 'my_exchange' | |
queue_name = 'queue_in' | |
routing_key = 'my_routing_key' | |
connection = await connect(url, loop=loop) | |
channel = await connection.channel() | |
exchange = await channel.declare_exchange(exchange_name) | |
await channel.queue_delete(queue_in_name) | |
queue = await channel.declare_queue(queue_name) | |
await queue.purge() | |
await queue.bind(exchange, routing_key) | |
# let's try to publish a message BEFORE queue.get | |
# msg = Message(bytes('Hello World: {!r}'.format(-1), 'utf-8')) | |
# await exchange.publish(msg, routing_key) | |
return exchange, routing_key, queue | |
async def publish_data(exchange, routing_key_consumer): | |
for n in range(0, 5): | |
msg = Message(bytes('Hello World: {!r}'.format(n), 'utf-8')) | |
print("Publish {}".format(msg.body)) | |
await exchange.publish(msg, routing_key_consumer) | |
await asyncio.sleep(0.5) | |
async def consumer_consume(queue): | |
def on_message_callback(message): | |
print('Received (consume callback): {}'.format(message.body)) | |
queue.consume(on_message_callback, exclusive=True) | |
async def consumer_get(queue): | |
while True: | |
try: | |
message = await queue.get() | |
print('Received (get): {}'.format(message.body)) | |
except TimeoutError: | |
print('TimeoutError') | |
def main(args=sys.argv[1:]): | |
print("starting..") | |
loop = asyncio.get_event_loop() | |
setup_coro = setup(loop=loop) | |
exchange, routing_key, queue = loop.run_until_complete(setup_coro) | |
if args[0] == 'consume': | |
print('Using queue.consume') | |
loop.create_task(consumer_consume(queue)) | |
else: | |
print('Using queue.get') | |
loop.create_task(consumer_get(queue)) | |
loop.run_until_complete(publish_data(exchange, routing_key)) | |
loop.stop() | |
loop.close() | |
print('done') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment