Created
December 7, 2015 06:05
-
-
Save aeroevan/988dde466a17b70ff4ae to your computer and use it in GitHub Desktop.
Fixup for concatenated avro files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 28938c5e63262d69f3c670e3cc7de9963318d239 Mon Sep 17 00:00:00 2001 | |
From: Evan McClain <[email protected]> | |
Date: Mon, 7 Dec 2015 00:53:39 -0500 | |
Subject: [PATCH] Add optional offset to _read_header. | |
--- | |
src/avro/datafile.py | 21 ++++++++++++--------- | |
1 file changed, 12 insertions(+), 9 deletions(-) | |
diff --git a/src/avro/datafile.py b/src/avro/datafile.py | |
index e12a68f..208e965 100644 | |
--- a/src/avro/datafile.py | |
+++ b/src/avro/datafile.py | |
@@ -98,7 +98,7 @@ class DataFileWriter(object): | |
else: | |
# open writer for reading to collect metadata | |
dfr = DataFileReader(writer, io.DatumReader()) | |
- | |
+ | |
# TODO(hammer): collect arbitrary metadata | |
# collect metadata | |
self._sync_marker = dfr.sync_marker | |
@@ -179,7 +179,7 @@ class DataFileWriter(object): | |
# Write block | |
self.writer.write(compressed_data) | |
- | |
+ | |
# Write CRC32 checksum for Snappy | |
if self.get_meta(CODEC_KEY) == 'snappy': | |
self.encoder.write_crc32(uncompressed_data) | |
@@ -188,7 +188,7 @@ class DataFileWriter(object): | |
self.writer.write(self.sync_marker) | |
# reset buffer | |
- self.buffer_writer.truncate(0) | |
+ self.buffer_writer.truncate(0) | |
self.block_count = 0 | |
def append(self, datum): | |
@@ -223,14 +223,14 @@ class DataFileReader(object): | |
"""Read files written by DataFileWriter.""" | |
# TODO(hammer): allow user to specify expected schema? | |
# TODO(hammer): allow user to specify the encoder | |
- def __init__(self, reader, datum_reader): | |
+ def __init__(self, reader, datum_reader, offset=0): | |
self._reader = reader | |
self._raw_decoder = io.BinaryDecoder(reader) | |
self._datum_decoder = None # Maybe reset at every block. | |
self._datum_reader = datum_reader | |
- | |
+ | |
# read the header: magic, meta, sync | |
- self._read_header() | |
+ self._read_header(offset) | |
# ensure codec is valid | |
self.codec = self.get_meta('avro.codec') | |
@@ -290,9 +290,10 @@ class DataFileReader(object): | |
def is_EOF(self): | |
return self.reader.tell() == self.file_length | |
- def _read_header(self): | |
+ def _read_header(self, offset): | |
# seek to the beginning of the file to get magic block | |
- self.reader.seek(0, 0) | |
+ pos = self.reader.tell() | |
+ self.reader.seek(offset, 0) | |
# read header into a dict | |
header = self.datum_reader.read_data( | |
@@ -309,6 +310,8 @@ class DataFileReader(object): | |
# set sync marker | |
self._sync_marker = header['sync'] | |
+ if pos > 0: | |
+ self.reader.seek(pos) | |
def _read_block_header(self): | |
self.block_count = self.raw_decoder.read_long() | |
@@ -359,7 +362,7 @@ class DataFileReader(object): | |
else: | |
self._read_block_header() | |
- datum = self.datum_reader.read(self.datum_decoder) | |
+ datum = self.datum_reader.read(self.datum_decoder) | |
self.block_count -= 1 | |
return datum | |
-- | |
2.6.3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import avro.schema | |
from avro.datafile import DataFileReader, DataFileWriter | |
from avro.io import DatumReader, DatumWriter | |
def fix_file(schemafile, infile, outfile): | |
schema = avro.schema.parse(open(schemafile).read()) | |
writer = DataFileWriter(open(outfile, "wb"), DatumWriter(), schema, | |
codec='deflate') | |
f = open(infile, "rb") | |
while (f.tell() != os.fstat(f.fileno()).st_size): | |
print("{0:6.2f}%".format( | |
100.0 * float(f.tell()) / float(os.fstat(f.fileno()).st_size))) | |
prev = f.tell() | |
# HACK: had to modify DatFileReader's _read_header() to seek to the | |
# given offset. | |
reader = DataFileReader(f, DatumReader(), prev) | |
try: | |
for row in reader: | |
writer.append(row) | |
prev = f.tell() | |
# once we have finished looping, we are done with the file. | |
break | |
except Exception: | |
writer.close() | |
writer = DataFileWriter(open(outfile, "ab+"), DatumWriter()) | |
# need to skip 16 byte sync | |
f.seek(prev+16) | |
continue | |
reader.close() | |
writer.close() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Fix avro file that has its schema defined multiple times.") | |
parser.add_argument("-s", "--schema", help="JSON formatted Avro schema", | |
required=True) | |
parser.add_argument("-i", "--input", help="Input Avro file to read", | |
required=True) | |
parser.add_argument("-o", "--output", help="Output Avro file to write", | |
required=True) | |
args = parser.parse_args() | |
fix_file(args.schema, args.input, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment