Last active
January 30, 2019 17:18
-
-
Save OneCricketeer/4db67f51fcaa02776340237762950b67 to your computer and use it in GitHub Desktop.
Confluent-Kafka-Python Avro Values and String Keys
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time | |
from confluent_kafka import avro | |
from confluent_kafka import Producer | |
from kafka_utils import bootstrap_servers, topic | |
from kafka_utils import serialize_avro | |
from kafka_utils import delivery_report | |
from model import LogEvent | |
# key_schema = avro.load('kafka/avro_schemas/key.avsc') | |
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder | |
p = Producer({'bootstrap.servers': bootstrap_servers}) | |
def send_avro(value, key=None, timestamp=None): | |
""" | |
Send value as Avro serialized object. Object class should be defined with asdict() function. | |
:param value: class object | |
:param key: <str|bytes> (default: None, so null key) | |
:param timestamp: unix millis since epoch | |
""" | |
if timestamp is not None: | |
timestamp = int(time()) | |
# Asynchronously produce a message, the delivery report callback | |
# will be triggered from poll() above, or flush() below, when the message has | |
# been successfully delivered or failed permanently. | |
if hasattr(value, 'asdict'): | |
value_payload = serialize_avro(topic, value_schema, value.asdict(), is_key=False) | |
else: | |
# TODO: Make this better? | |
raise Exception("asdict() is not defined for {}".format(type(value))) | |
p.produce(topic, key=key, value=value_payload, callback=delivery_report) | |
def main(): | |
for x in range(100): | |
# Trigger any available delivery report callbacks from previous produce() calls | |
p.poll(0) | |
timestamp = int(time()) | |
value_obj = LogEvent.Value("test", "Hello, Python! {}".format(x), timestamp=timestamp) | |
send_avro(value_obj, key="record={}".format(x), timestamp=timestamp) | |
flush() | |
def flush(): | |
p.flush() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from confluent_kafka.avro import CachedSchemaRegistryClient | |
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer | |
bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS', 'localhost:9092') | |
topic = os.environ.get('KAFKA_TOPIC', 'my_topic') | |
schema_registry = CachedSchemaRegistryClient(os.environ.get('SCHEMA_REGISTRY', 'http://localhost:8081')) | |
def load_schema_from_registry(subject, is_key=False, version_id='latest'): | |
global schema_registry | |
suffix = '-key' if is_key else '-value' | |
try: | |
return schema_registry.get_schema("{}-{}".format(subject, suffix, version_id)) | |
except Exception as e: | |
raise e | |
def delivery_report(err, msg, verbose=False): | |
""" Called once for each message produced to indicate delivery result. | |
Triggered by poll() or flush(). """ | |
if err is not None and verbose: | |
print('Message delivery failed: {}'.format(err)) | |
else: | |
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) | |
# ================ | |
# Serializers | |
# ================ | |
def serialize_str(s): | |
return s.encode('utf-8') | |
avro_serializer = AvroSerializer(schema_registry) | |
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time | |
class LogEvent: | |
""" | |
An Avro Record to send to Kafka | |
key: str | |
value: LogEvent.value | |
""" | |
class Value: | |
""" | |
Translated model class from an Avro schema | |
""" | |
def __init__(self, description, message, timestamp=None): | |
self.timestamp = int(time()) if timestamp is None else timestamp # Kafka doesn't have times in the record before 0.10 | |
self.description = description # to trace where message came from | |
self.message = message | |
def asdict(self): | |
return { | |
"timestamp": self.timestamp, | |
"description": self.description, | |
"message": self.message | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment