Created
November 11, 2019 17:09
-
-
Save lucj/84e511003907b9a4d3270e63d896ec1b to your computer and use it in GitHub Desktop.
NATS subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import ssl | |
import sys | |
from nats.aio.client import Client as NATS | |
async def error_cb(e): | |
print("Error:", e) | |
async def run(loop): | |
# Connection to NATS | |
nc = NATS() | |
# Use server's CA | |
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) | |
ssl_ctx.load_verify_locations('./certs/ca.pem') | |
options = { | |
"servers": ["nats://messaging.techwhale.io:4222"], | |
"io_loop": loop, | |
"tls": ssl_ctx, | |
"error_cb": error_cb | |
} | |
await nc.connect(**options) | |
print("Connected to NATS: {}".format(nc.connected_url.netloc)) | |
# Handling incoming messages | |
async def message_handler(msg): | |
data = json.loads(msg.data.decode()) | |
print(data) | |
# Subscription to all messages published to the provided subject (if authorized to) | |
subject = "nats.demo" | |
await nc.subscribe(subject, cb=message_handler) | |
print('Subscribed to [%s]' % subject); | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(loop)) | |
try: | |
loop.run_forever() | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment