Created
December 24, 2020 00:26
-
-
Save Alfex4936/8ca7af03f673b76249ed3c6c0917f6c2 to your computer and use it in GitHub Desktop.
Python: Confluent Kafka + FastAvro (Producer + Consumer)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import BytesIO | |
from fastavro import parse_schema, schemaless_reader, writer | |
# a data to send | |
message = { | |
"id": 10000, | |
"title": "[FastAVRO] title", | |
"date": "20.12.23", | |
"link": "https://somelink", | |
"writer": "alfex4936", | |
} | |
# Producer part | |
producer_rb = BytesIO() # write to bytes | |
schemaless_writer(producer_rb, parsed_schema, message) # single data write | |
produced_data = producer_rb.getvalue() # read from bytes | |
producer.produce(produced_data) # Producer produce | |
# ... py | |
# Consumer part | |
# SIGINT can't be handled when polling, limit timeout to 1 second. | |
msg = c.poll(1.0) # consume data from producer | |
message = msg.value() # get value of msg | |
consumer_rb = BytesIO(message) | |
decoded = schemaless_reader(consumer_rb, parsed_schema) # single data read | |
assert decoded == { | |
"id": 10000, | |
"title": "[FastAVRO] test title", | |
"date": "20.12.23", | |
"link": "https://somelink", | |
"writer": "alfex4936", | |
} | |
# decoded will be dictionary again |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment