Skip to content

Instantly share code, notes, and snippets.

@Alfex4936
Created December 24, 2020 00:26
Show Gist options
  • Save Alfex4936/8ca7af03f673b76249ed3c6c0917f6c2 to your computer and use it in GitHub Desktop.
Save Alfex4936/8ca7af03f673b76249ed3c6c0917f6c2 to your computer and use it in GitHub Desktop.
Python: Confluent Kafka + FastAvro (Producer + Consumer)
from io import BytesIO
from fastavro import parse_schema, schemaless_reader, writer
# a data to send
message = {
"id": 10000,
"title": "[FastAVRO] title",
"date": "20.12.23",
"link": "https://somelink",
"writer": "alfex4936",
}
# Producer part
producer_rb = BytesIO() # write to bytes
schemaless_writer(producer_rb, parsed_schema, message) # single data write
produced_data = producer_rb.getvalue() # read from bytes
producer.produce(produced_data) # Producer produce
# ... py
# Consumer part
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = c.poll(1.0) # consume data from producer
message = msg.value() # get value of msg
consumer_rb = BytesIO(message)
decoded = schemaless_reader(consumer_rb, parsed_schema) # single data read
assert decoded == {
"id": 10000,
"title": "[FastAVRO] test title",
"date": "20.12.23",
"link": "https://somelink",
"writer": "alfex4936",
}
# decoded will be dictionary again
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment