Created
January 15, 2018 09:14
-
-
Save ContrastingSounds/c7048e375c88d3a85720554dbd0190a4 to your computer and use it in GitHub Desktop.
Stub functions to dynamically create an Avro schema, and save records to file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import avro.schema | |
from avro.datafile import DataFileWriter | |
from avro.io import DatumWriter | |
STATE_SCHEMA = 'state_schema.avsc' | |
STATE_RECORDS = 'state_records.avro' | |
model_dictionary = { | |
'dimensions': set(), | |
'integers': set(), | |
'floats': set(), | |
} | |
def create_avro_schema(model: dict, file_name: str): | |
""" | |
Given a dictionary defining an Avro schema, both saves to file (file_name.avsc) and returns schema object. | |
:param model: dictionary of model definition that conforms to Avro format (see http://avro.apache.org/) | |
:param file_name: name of file to write Avro schema | |
:return: | |
""" | |
state = { | |
'namespace': 'state.avro', | |
'type': 'record', | |
'name': 'State', | |
'fields': [ | |
{'name': 'first_created', 'type': ['int', 'null']}, | |
{'name': 'entity', 'type': 'string'}, | |
{'name': 'uuid', 'type': 'string'}, | |
{'name': 'parent_uuid', 'type': ['string', 'null']}, | |
] | |
} | |
state['fields'] += [{'name': column, 'type': ['string', 'null']} for column in model['dimensions']] | |
state['fields'] += [{'name': column, 'type': ['int', 'null']} for column in model['integers']] | |
state['fields'] += [{'name': column, 'type': ['float', 'null']} for column in model['floats']] | |
with open(file_name, 'w') as schema_file: | |
json.dump(state, schema_file, indent=4) | |
schema = avro.schema.Parse(json.dumps(state)) | |
return schema | |
def save_avro_output(model: dict, file_name: str, records: list): | |
""" | |
Given a list of records, saves to file in Avro format. | |
:param model: dictionary of model definition that conforms to Avro format (see http://avro.apache.org/) | |
:param file_name: name of file to write Avro data | |
:param records: list of dicts, containing the actual records to be saved | |
:return: | |
""" | |
schema = create_avro_schema(model) | |
avro_output = open(file_name, 'wb') | |
writer = DataFileWriter(avro_output, DatumWriter(), schema) | |
for record in records: | |
writer.append(record) | |
writer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment