Skip to content

Instantly share code, notes, and snippets.

@Inveracity
Created July 23, 2017 10:30
Show Gist options
  • Save Inveracity/f3f146b6fd067458fd4cb5e4a34b786c to your computer and use it in GitHub Desktop.
Save Inveracity/f3f146b6fd067458fd4cb5e4a34b786c to your computer and use it in GitHub Desktop.
async rmq
import asyncio
from aio_pika import connect, Message
async def main(future):
USERNAME = 'myuser'
PASSWORD = 'mypass'
HOST = 'localhost'
PORT = 5672
QUEUE = 'text'
ROUTING_KEY = 'example.text'
message = Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
)
connection = await connect("amqp://{}:{}@{}/".format(USERNAME, PASSWORD, HOST), loop=loop)
channel = await connection.channel()
exchange = await channel.declare_exchange('direct', auto_delete=True)
queue = await channel.declare_queue(QUEUE, auto_delete=False)
await queue.bind(exchange, ROUTING_KEY)
# Receiving message
print("waiting to recieve")
incoming_message = await queue.get() #set as arg to get timeout: timeout=10
print("recieved message")
incoming_message.ack()
print("acknowledged message")
future.set_result(incoming_message.body) #.decode('utf-8')
print("set future")
#print("unbind")
#await queue.unbind(exchange, ROUTING_KEY)
## await queue.delete()
#print("close")
#await connection.close()
def doit(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(main(future))
future.add_done_callback(doit)
# loop.run_until_complete(main(loop))
try:
loop.run_forever()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment