Created
December 19, 2024 15:27
-
-
Save hannes/7cf7743560333890ad9b3b8e0d9acfc2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from uuid import uuid4 | |
from argparse import ArgumentParser | |
from datetime import datetime, timedelta | |
# avro stuff | |
from avro.datafile import DataFileWriter | |
from avro.io import DatumWriter, BinaryEncoder | |
# kafka stuff | |
from confluent_kafka import Producer, Consumer | |
from confluent_kafka.schema_registry import SchemaRegistryClient | |
from confluent_kafka.schema_registry.avro import AvroSerializer | |
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField | |
parser = ArgumentParser(description='Convert Avro messages from Kafka to file') | |
parser.add_argument('--schema_registry', action='store', | |
help='Kafka Schema Registry URL (https://user:password@hostname)', required=True) | |
parser.add_argument('--schema_id', action='store', | |
help='Schema ID', required=True, type=int) | |
parser.add_argument('--out_file', action='store', | |
help='File name for mesages to be stored in', required=True) | |
parser.add_argument('--topic', action='store', | |
help='Kafka topic to read', required=True) | |
parser.add_argument('--bootstrap_server', action='store', | |
help='Kafka bootstrap server', default='') | |
parser.add_argument('--sasl_username', action='store', | |
help='Kafka authentication username', required=True) | |
parser.add_argument('--sasl_password', action='store', | |
help='Kafka authentication password', required=True) | |
parser.add_argument('--flush_batch_size', action='store', | |
help='File flush batch size', type=int, default=100) | |
args = parser.parse_args() | |
# fetch avro schema from registry | |
schema_registry_client = SchemaRegistryClient({'url': args.schema_registry}) | |
schema = schema_registry_client.get_schema(args.schema_id) | |
assert(schema.schema_type == 'AVRO') | |
# write empty avro header using schema string from registry | |
DataFileWriter(open(args.out_file, 'wb'), DatumWriter(), schema.schema_str).close() | |
# now we 'steal' the random sync marker for later | |
out = open(args.out_file, 'rb') | |
out.seek(-16, 2) # 2 means seek relative to end of file | |
avro_sync_marker = out.read(16) | |
out.close() | |
consumer_config = { | |
'group.id': str(uuid4()), | |
'security.protocol': 'SASL_SSL', | |
'sasl.mechanisms': 'PLAIN', | |
'bootstrap.servers': args.bootstrap_server, | |
'sasl.username': args.sasl_username, | |
'sasl.password': args.sasl_password | |
} | |
avro_messages = [] | |
out = open(args.out_file, 'ab') # opening with 'a' means append | |
encoder = BinaryEncoder(out) # directly use raw avro encoder on the file | |
total_messages = 0 | |
def flush(): | |
global total_messages, avro_messages # cough | |
total_messages += len(avro_messages) | |
# A long indicating the count of objects in this block. | |
encoder.write_long(len(avro_messages)) | |
# A long indicating the size in bytes of the serialized objects in the current block | |
avro_messages_bytes = b''.join(avro_messages) | |
encoder.write_long(len(avro_messages_bytes)) | |
# The serialized objects. | |
out.write(avro_messages_bytes) | |
# The file’s 16-byte sync marker. | |
out.write(avro_sync_marker) | |
out.flush() | |
avro_messages = [] | |
consumer = Consumer(consumer_config) | |
# magic incantation to get all messages | |
def on_assign (c, ps): | |
for p in ps: | |
p.offset=-2 | |
c.assign(ps) | |
consumer.subscribe([args.topic], on_assign=on_assign) | |
while True: | |
msg = consumer.poll(1) | |
if msg is None: | |
break | |
if msg.error(): | |
raise ValueException(msg.error()) | |
value = msg.value() | |
# see https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format | |
# Byte 0: Confluent serialization format version number; currently always 0. | |
if value[0] != 0: | |
continue | |
# Bytes 1-5: 4-byte schema ID as returned by Schema Registry. (big endian!) | |
message_schema_id = int.from_bytes(value[1:5], byteorder='big') | |
assert message_schema_id == args.schema_id | |
# Bytes 5ff: Avro binary data | |
avro_data = value[5:] | |
avro_messages.append(avro_data) | |
if len(avro_messages) > args.flush_batch_size: | |
flush() | |
consumer.close() | |
flush() | |
out.close() | |
print(f'Wrote {total_messages} messages to {args.out_file}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment