Created
April 14, 2018 00:33
-
-
Save dharamsk/473d87f8f1cf1fe1af87296db8def2a9 to your computer and use it in GitHub Desktop.
Avro Debugger Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import argparse | |
import mock | |
from where_ever.stream_handler import * | |
from where_ever.tests.test_stream_handler import * | |
""" | |
This will check an avro schema against an example data record | |
It will tell you two things: | |
1) Which, if any, schema fields contain avro syntax errors | |
2) A list of fields in the data record that don't have field definitions in the schema | |
To use this: | |
- Change the imports above to import * from your stream_handler and any other necessary files | |
- Change the 3 Config Variables below | |
""" | |
# Config Variables | |
FIXTURE_OBJECT = EXAMPLE_RECORD # The name of your example data record imported by the test_stream_handler | |
avro_for_object = avro_for_record # The name of your stream_handler function that will perform any conversions on the output of the data record | |
AVRO_OBJECT_NAME = 'Record' # The name of your object in the avro protocol setup, used to get the correct schema from the protocol | |
# Setup | |
records = [dynamo_item_to_python_dict_converter( | |
record[u'dynamodb'][u'NewImage']) for record in [FIXTURE_OBJECT]] | |
batch = [avro_for_object(r) for r in records] | |
proto = avro.protocol.parse(json.dumps(YOUR_SPESHUL_PROTOCOL)) | |
schema = proto.types_dict[AVRO_OBJECT_NAME] | |
# Functions | |
def _remove_a_field(schema): | |
i = random.randint(0,len(schema.fields)-1) | |
field = schema.fields[i] | |
del schema.fields[i] | |
return schema, field | |
def _replace_a_field(schema, field): | |
schema.fields.append(field) | |
return schema | |
def debug_avro_fields(schema, batch): | |
z_schema = deepcopy(schema) | |
z_batch = deepcopy(batch) | |
encoded = None | |
switch = True | |
removed = [] | |
failing = [] | |
while switch: | |
# try to encode | |
try: | |
encoded = your_speshul_serializer.serialize(z_schema, z_batch) | |
# except remove a field | |
except: | |
z_schema, r = _remove_a_field(z_schema) | |
removed.append(r) | |
# if encoding succeeded | |
if encoded: | |
while len(removed) > 0: | |
# replace a field (pop from list) | |
r = removed.pop() | |
z_schema = _replace_a_field(z_schema, r) | |
# try to encode | |
try: | |
encoded = your_speshul_serializer.serialize(z_schema, z_batch) | |
# except remove the field, label it failing | |
except: | |
failing.append(z_schema.fields.pop()) | |
# turn switch off | |
switch = False | |
return failing | |
def missing_fields(schema, batch): | |
f_names = [f.name for f in schema.fields] | |
out = [] | |
for k in batch[0].keys(): | |
if k not in f_names: | |
out.append(k) | |
return out | |
# Run it | |
if __name__ == "__main__": | |
fails = debug_avro_fields(schema, batch) | |
if len(fails) > 0: | |
print "Found avro syntax errors: " | |
else: | |
print "Syntax looks good!" | |
for f in fails: | |
print f.name | |
missing = missing_fields(schema, batch) | |
if len(missing) > 0: | |
print "Found fields missing from schema: " | |
else: | |
print "All fields accounted for!" | |
for m in missing: | |
print m | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment