Skip to content

Instantly share code, notes, and snippets.

@cicorias
Created March 23, 2022 13:16
Show Gist options
  • Save cicorias/31c2deb5403f08db0bb4a7086dd922db to your computer and use it in GitHub Desktop.
Save cicorias/31c2deb5403f08db0bb4a7086dd922db to your computer and use it in GitHub Desktop.
Convert AVRO files to JSONL files using Python

Overview

Using this you can run against a directory that has nested files/folders with files of *.avro.

This will walk the tree, convert alongside each of the avro files.

python avroConvert.py "mypath"
import os
import sys
import json
import logging
from datetime import datetime
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
current_datetime = datetime.now()
str_current_datetime = str(current_datetime)
log_file = str_current_datetime.replace(" ", "_").replace(":", "-").replace(".", "-") + '.log'
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
def convert_file(file_name: os.DirEntry):
if 508 >= os.stat(file_name.path).st_size:
return
logging.info(f'f: {file_name.path}')
reader = DataFileReader(open(file_name, 'rb'), DatumReader())
new_filename = file_name.path.replace('.avro', '.jsonl').replace('avro', 'json')
logging.info(f'writing to: {new_filename}')
with open(new_filename, 'a') as myfile:
for reading in reader:
parsed_json = json.loads(reading["Body"])
myfile.write(json.dumps(parsed_json) + '\n')
reader.close()
def get_avro_files(base_dir):
for entry in os.scandir(base_dir):
if entry.is_file() and entry.name.endswith(".avro"):
yield entry
elif entry.is_dir():
yield from get_avro_files(entry.path)
else:
logging.warning(f"Neither a file, nor a dir: {entry.path}")
def process_path(base_path: str):
for file in get_avro_files(base_dir=base_path):
convert_file(file)
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error("provide base path where avro files are located")
exit(1)
base_path = os.path.abspath(sys.argv[1])
if os.path.isdir(base_path):
process_path(base_path)
exit(0)
else:
logging.error(f'provided argv is not a directory: {sys.argv[1]}')
exit(1)
autopep8==1.6.0
avro-python3==1.10.2
azure-core==1.23.0
azure-eventhub==5.7.0
azure-storage-blob==12.10.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment