Using this you can run against a directory that has nested files/folders with files of *.avro
.
This will walk the tree, convert alongside each of the avro files.
python avroConvert.py "mypath"
import os | |
import sys | |
import json | |
import logging | |
from datetime import datetime | |
from avro.datafile import DataFileReader, DataFileWriter | |
from avro.io import DatumReader, DatumWriter | |
current_datetime = datetime.now() | |
str_current_datetime = str(current_datetime) | |
log_file = str_current_datetime.replace(" ", "_").replace(":", "-").replace(".", "-") + '.log' | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
def convert_file(file_name: os.DirEntry): | |
if 508 >= os.stat(file_name.path).st_size: | |
return | |
logging.info(f'f: {file_name.path}') | |
reader = DataFileReader(open(file_name, 'rb'), DatumReader()) | |
new_filename = file_name.path.replace('.avro', '.jsonl').replace('avro', 'json') | |
logging.info(f'writing to: {new_filename}') | |
with open(new_filename, 'a') as myfile: | |
for reading in reader: | |
parsed_json = json.loads(reading["Body"]) | |
myfile.write(json.dumps(parsed_json) + '\n') | |
reader.close() | |
def get_avro_files(base_dir): | |
for entry in os.scandir(base_dir): | |
if entry.is_file() and entry.name.endswith(".avro"): | |
yield entry | |
elif entry.is_dir(): | |
yield from get_avro_files(entry.path) | |
else: | |
logging.warning(f"Neither a file, nor a dir: {entry.path}") | |
def process_path(base_path: str): | |
for file in get_avro_files(base_dir=base_path): | |
convert_file(file) | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
logging.error("provide base path where avro files are located") | |
exit(1) | |
base_path = os.path.abspath(sys.argv[1]) | |
if os.path.isdir(base_path): | |
process_path(base_path) | |
exit(0) | |
else: | |
logging.error(f'provided argv is not a directory: {sys.argv[1]}') | |
exit(1) |
autopep8==1.6.0 | |
avro-python3==1.10.2 | |
azure-core==1.23.0 | |
azure-eventhub==5.7.0 | |
azure-storage-blob==12.10.0 |