Skip to content

Instantly share code, notes, and snippets.

@mfe5003
Created September 16, 2017 16:36
Show Gist options
  • Select an option

  • Save mfe5003/16140f8255f2a9472e1e762d49b9ee15 to your computer and use it in GitHub Desktop.

Select an option

Save mfe5003/16140f8255f2a9472e1e762d49b9ee15 to your computer and use it in GitHub Desktop.
example producer with no schema
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# load schema to the registry from one producer
value_schema = avro.load('test_schema.avsc')
# This producer creates the schema and registers it
p1 = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
},
default_value_schema=value_schema
)
# This producer doesn't know about the schema
p2 = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}
)
topic = 'test.avro'
for i in range(100):
p1.produce(topic=topic, value={"f1": "value" + str(i)})
# I need to get the schema_id, which is not easily exposed
schema_id = p1._serializer.registry_client.get_latest_schema(topic+'-value')[0]
for i in range(100):
p2.produce(
topic=topic,
value={"f1": "value" + str(i)},
value_schema_id=schema_id
)
p1.flush()
p2.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment