Skip to content

Instantly share code, notes, and snippets.

@hannes
Created December 19, 2024 15:27
Show Gist options
  • Save hannes/7cf7743560333890ad9b3b8e0d9acfc2 to your computer and use it in GitHub Desktop.
Save hannes/7cf7743560333890ad9b3b8e0d9acfc2 to your computer and use it in GitHub Desktop.
from uuid import uuid4
from argparse import ArgumentParser
from datetime import datetime, timedelta
# avro stuff
from avro.datafile import DataFileWriter
from avro.io import DatumWriter, BinaryEncoder
# kafka stuff
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
parser = ArgumentParser(description='Convert Avro messages from Kafka to file')
parser.add_argument('--schema_registry', action='store',
help='Kafka Schema Registry URL (https://user:password@hostname)', required=True)
parser.add_argument('--schema_id', action='store',
help='Schema ID', required=True, type=int)
parser.add_argument('--out_file', action='store',
help='File name for mesages to be stored in', required=True)
parser.add_argument('--topic', action='store',
help='Kafka topic to read', required=True)
parser.add_argument('--bootstrap_server', action='store',
help='Kafka bootstrap server', default='')
parser.add_argument('--sasl_username', action='store',
help='Kafka authentication username', required=True)
parser.add_argument('--sasl_password', action='store',
help='Kafka authentication password', required=True)
parser.add_argument('--flush_batch_size', action='store',
help='File flush batch size', type=int, default=100)
args = parser.parse_args()
# fetch avro schema from registry
schema_registry_client = SchemaRegistryClient({'url': args.schema_registry})
schema = schema_registry_client.get_schema(args.schema_id)
assert(schema.schema_type == 'AVRO')
# write empty avro header using schema string from registry
DataFileWriter(open(args.out_file, 'wb'), DatumWriter(), schema.schema_str).close()
# now we 'steal' the random sync marker for later
out = open(args.out_file, 'rb')
out.seek(-16, 2) # 2 means seek relative to end of file
avro_sync_marker = out.read(16)
out.close()
consumer_config = {
'group.id': str(uuid4()),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'bootstrap.servers': args.bootstrap_server,
'sasl.username': args.sasl_username,
'sasl.password': args.sasl_password
}
avro_messages = []
out = open(args.out_file, 'ab') # opening with 'a' means append
encoder = BinaryEncoder(out) # directly use raw avro encoder on the file
total_messages = 0
def flush():
global total_messages, avro_messages # cough
total_messages += len(avro_messages)
# A long indicating the count of objects in this block.
encoder.write_long(len(avro_messages))
# A long indicating the size in bytes of the serialized objects in the current block
avro_messages_bytes = b''.join(avro_messages)
encoder.write_long(len(avro_messages_bytes))
# The serialized objects.
out.write(avro_messages_bytes)
# The file’s 16-byte sync marker.
out.write(avro_sync_marker)
out.flush()
avro_messages = []
consumer = Consumer(consumer_config)
# magic incantation to get all messages
def on_assign (c, ps):
for p in ps:
p.offset=-2
c.assign(ps)
consumer.subscribe([args.topic], on_assign=on_assign)
while True:
msg = consumer.poll(1)
if msg is None:
break
if msg.error():
raise ValueException(msg.error())
value = msg.value()
# see https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format
# Byte 0: Confluent serialization format version number; currently always 0.
if value[0] != 0:
continue
# Bytes 1-5: 4-byte schema ID as returned by Schema Registry. (big endian!)
message_schema_id = int.from_bytes(value[1:5], byteorder='big')
assert message_schema_id == args.schema_id
# Bytes 5ff: Avro binary data
avro_data = value[5:]
avro_messages.append(avro_data)
if len(avro_messages) > args.flush_batch_size:
flush()
consumer.close()
flush()
out.close()
print(f'Wrote {total_messages} messages to {args.out_file}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment