Created
March 16, 2020 20:06
-
-
Save OneCricketeer/a72a9366e3c24c497680f66f320c38e1 to your computer and use it in GitHub Desktop.
Confluent Schema Registry Python utils
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''1556029180121 {"subject":"topic-value","version":1,"magic":1,"keytype":"SCHEMA"} null | |
''' | |
from confluent_kafka import Producer | |
import sys | |
bootstrap = '' | |
while len(bootstrap.strip()) == 0: | |
bootstrap = input('bootstrap>') | |
conf = {'bootstrap.servers': bootstrap} | |
p = Producer(**conf) | |
def delivery_callback(err, msg): | |
if err: | |
sys.stderr.write('%% Message failed delivery: %s\n' % err) | |
else: | |
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' % | |
(msg.topic(), msg.partition(), msg.offset())) | |
topic = '_schemas' | |
key = '' | |
while len(key.strip()) == 0: | |
key = input('key>') | |
value = '' | |
while len(value.strip()) == 0: | |
value = input('value>') | |
try: | |
p.produce(topic, key=key.rstrip(), value=value.rstrip(), | |
callback=delivery_callback) | |
except BufferError: | |
sys.stderr.write( | |
'%% Local producer queue is full \ | |
(%d messages awaiting delivery): try again\n' % len(p)) | |
p.poll(0) | |
# Wait until all messages have been delivered | |
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) | |
p.flush() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''1556029180121 {"subject":"topic-value","version":1,"magic":1,"keytype":"SCHEMA"} null | |
''' | |
from confluent_kafka import Producer | |
import os | |
import sys | |
# broker = localhost:29092 | |
broker = os.environ['BOOTSTRAP_SERVERS'] | |
conf = {'bootstrap.servers': broker} | |
p = Producer(**conf) | |
def delivery_callback(err, msg): | |
if err: | |
sys.stderr.write('%% Message failed delivery: %s\n' % err) | |
else: | |
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset())) | |
topic = '_schemas' | |
key = '' | |
while len(key.strip()) == 0: | |
key = input('>') | |
try: | |
p.produce(topic, key=key.rstrip(), value=None, callback=delivery_callback) | |
except BufferError: | |
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) | |
p.poll(0) | |
# Wait until all messages have been delivered | |
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) | |
p.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment