Skip to content

Instantly share code, notes, and snippets.

@martindurant
Last active November 19, 2021 19:28
Show Gist options
  • Save martindurant/33afb1e1ec3dd15e974df5c9755b328a to your computer and use it in GitHub Desktop.
Save martindurant/33afb1e1ec3dd15e974df5c9755b328a to your computer and use it in GitHub Desktop.
How to read an avro header easily
from __future__ import unicode_literals
import json
MAGIC = b'Obj\x01'
SYNC_SIZE = 16
def read_long(fo):
"""variable-length, zig-zag coding."""
c = fo.read(1)
if not c:
# end of input
return 0
b = ord(c)
n = b & 0x7F
shift = 7
while (b & 0x80) != 0:
b = ord(fo.read(1))
n |= (b & 0x7F) << shift
shift += 7
return (n >> 1) ^ -(n & 1)
def read_bytes(fo):
"""a long followed by that many bytes of data."""
size = read_long(fo)
return fo.read(size)
def read_header(fo, file_size=0):
"""Extract an avro file's header
fo: file-like
This should be in bytes mode, e.g., io.BytesIO
Returns dict representing the header
"""
assert fo.read(len(MAGIC)) == MAGIC, 'Magic avro bytes missing'
meta = {}
while True:
n_keys = read_long(fo)
if n_keys == 0:
break
for i in range(n_keys):
key = read_bytes(fo).decode('utf8')
val = read_bytes(fo)
if key == 'avro.schema':
val = json.loads(val.decode('utf8'))
meta[key] = val
out = {'meta': meta}
out['sync'] = fo.read(SYNC_SIZE)
out['header_size'] = fo.tell()
#peek
out['first_block_count'] = read_long(fo)
out['first_block_bytes'] = read_long(fo)
out['first_block_data'] = fo.tell()
out['blocks'] = [{'offset': out['header_size'],
'size': (out['first_block_bytes'] + out['first_block_data']
- out['header_size']),
'num': out['first_block_count']}]
return out
def scan_blocks(fo, header, file_size):
"""Find offsets of the blocks by jumping through.
Useful where the blocks are large compared to read buffers.
If blocks are small compared to read buffers, better off searching for the
sync delimiter.
Results are attached to the header dict.
"""
if len(header['blocks']) > 1:
# already done
return
data = header['first_block_data']
bytes = header['first_block_bytes']
while True:
off0 = data + bytes
fo.seek(off0)
assert fo.read(SYNC_SIZE) == header['sync'], "Sync failed"
off = fo.tell()
if off >= file_size:
return
num = read_long(fo)
bytes = read_long(fo)
data = fo.tell()
if num == 0 or bytes == 0:
# can have zero-length blocks??
raise ValueError("Unexpected end of input")
header['blocks'].append({'offset': off, 'size': data - off + bytes,
'num': num})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment