Skip to content

Instantly share code, notes, and snippets.

@OneCricketeer
Created March 16, 2020 20:06
Show Gist options
  • Save OneCricketeer/a72a9366e3c24c497680f66f320c38e1 to your computer and use it in GitHub Desktop.
Save OneCricketeer/a72a9366e3c24c497680f66f320c38e1 to your computer and use it in GitHub Desktop.
Confluent Schema Registry Python utils
'''1556029180121 {"subject":"topic-value","version":1,"magic":1,"keytype":"SCHEMA"} null
'''
from confluent_kafka import Producer
import sys
bootstrap = ''
while len(bootstrap.strip()) == 0:
bootstrap = input('bootstrap>')
conf = {'bootstrap.servers': bootstrap}
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
topic = '_schemas'
key = ''
while len(key.strip()) == 0:
key = input('key>')
value = ''
while len(value.strip()) == 0:
value = input('value>')
try:
p.produce(topic, key=key.rstrip(), value=value.rstrip(),
callback=delivery_callback)
except BufferError:
sys.stderr.write(
'%% Local producer queue is full \
(%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
'''1556029180121 {"subject":"topic-value","version":1,"magic":1,"keytype":"SCHEMA"} null
'''
from confluent_kafka import Producer
import os
import sys
# broker = localhost:29092
broker = os.environ['BOOTSTRAP_SERVERS']
conf = {'bootstrap.servers': broker}
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))
topic = '_schemas'
key = ''
while len(key.strip()) == 0:
key = input('>')
try:
p.produce(topic, key=key.rstrip(), value=None, callback=delivery_callback)
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment