Last active
July 11, 2024 12:41
-
-
Save t04glovern/aec9e2391aa94eb017f1bc265feeac7c to your computer and use it in GitHub Desktop.
This script reads and processes Puffin files, extracting and printing blob metadata. It supports reading metadata from the file footer and decompressing the data using the specified compression codec.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This script reads and processes Puffin files, extracting and printing blob metadata. | |
It supports reading metadata from the file footer and decompressing the | |
data using the specified compression codec. | |
Spec: | |
https://iceberg.apache.org/puffin-spec/ | |
Install: | |
python3 -m venv .venv | |
source .venv/bin/activate | |
pip3 install zstandard | |
curl https://gist.github.com/t04glovern/aec9e2391aa94eb017f1bc265feeac7c/raw \ | |
> iceberg-puffin-reader.py \ | |
&& chmod +x iceberg-puffin-reader.py | |
Usage: | |
./iceberg-puffin-reader.py --file <file-path> [--pretty] | |
""" | |
import argparse | |
import io | |
import logging | |
import json | |
import struct | |
from enum import Enum | |
from typing import List, Tuple, Dict, Optional, Any | |
import zstandard as zstd | |
import pprint | |
logging.basicConfig(level=logging.INFO) | |
class PuffinCompressionCodec(Enum): | |
NONE = None | |
LZ4 = "lz4" | |
ZSTD = "zstd" | |
@staticmethod | |
def for_name(name: Optional[str]) -> 'PuffinCompressionCodec': | |
for codec in PuffinCompressionCodec: | |
if codec.value == name: | |
return codec | |
raise ValueError(f"Unknown codec name: {name}") | |
class Flag(Enum): | |
FOOTER_PAYLOAD_COMPRESSED = (0, 0) | |
@staticmethod | |
def from_bit(byte_number: int, bit_number: int) -> Optional['Flag']: | |
for flag in Flag: | |
if flag.value == (byte_number, bit_number): | |
return flag | |
return None | |
class PuffinFormat: | |
FOOTER_START_MAGIC_OFFSET = 0 | |
FOOTER_START_MAGIC_LENGTH = 4 | |
FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET = 0 | |
FOOTER_STRUCT_FLAGS_OFFSET = 4 | |
FOOTER_STRUCT_FLAGS_LENGTH = 4 | |
FOOTER_STRUCT_MAGIC_OFFSET = 8 | |
FOOTER_STRUCT_LENGTH = 12 | |
FOOTER_COMPRESSION_CODEC = PuffinCompressionCodec.LZ4 | |
@staticmethod | |
def get_magic() -> bytes: | |
return bytes([0x50, 0x46, 0x41, 0x31]) | |
@staticmethod | |
def read_integer_little_endian(data: bytes, offset: int) -> int: | |
return struct.unpack_from('<I', data, offset)[0] | |
@staticmethod | |
def compress(codec: PuffinCompressionCodec, input_data: bytes) -> bytes: | |
if codec == PuffinCompressionCodec.NONE: | |
return input_data | |
elif codec == PuffinCompressionCodec.ZSTD: | |
return zstd.compress(input_data) | |
else: | |
raise NotImplementedError(f"Unsupported codec: {codec}") | |
@staticmethod | |
def decompress(codec: PuffinCompressionCodec, input_data: bytes) -> bytes: | |
if codec == PuffinCompressionCodec.NONE: | |
return input_data | |
elif codec == PuffinCompressionCodec.ZSTD: | |
return zstd.decompress(input_data) | |
else: | |
raise NotImplementedError(f"Unsupported codec: {codec}") | |
class PuffinReader: | |
class BlobMetadata: | |
def __init__(self, type: str, input_fields: List[str], snapshot_id: int, sequence_number: int, | |
offset: int, length: int, compression_codec: Optional[str], properties: Dict[str, Any]): | |
self.type = type | |
self.input_fields = input_fields | |
self.snapshot_id = snapshot_id | |
self.sequence_number = sequence_number | |
self.offset = offset | |
self.length = length | |
self.compression_codec = compression_codec | |
self.properties = properties | |
def __init__(self, input_file: bytes, file_size: Optional[int] = None, footer_size: Optional[int] = None): | |
self.file_size = file_size if file_size is not None else len(input_file) | |
self.input = io.BytesIO(input_file) | |
self.known_footer_size = footer_size | |
self.known_file_metadata: Optional[Dict[str, Any]] = None | |
if footer_size is not None: | |
if not (0 < footer_size <= self.file_size - len(PuffinFormat.get_magic())): | |
raise ValueError(f"Invalid footer size: {footer_size}") | |
def file_metadata(self) -> Dict[str, Any]: | |
if self.known_file_metadata is None: | |
footer_size = self.footer_size() | |
self.input.seek(self.file_size - footer_size) | |
footer = self.input.read(footer_size) | |
self.check_magic(footer, PuffinFormat.FOOTER_START_MAGIC_OFFSET) | |
footer_struct_offset = footer_size - PuffinFormat.FOOTER_STRUCT_LENGTH | |
self.check_magic(footer, footer_struct_offset + PuffinFormat.FOOTER_STRUCT_MAGIC_OFFSET) | |
footer_compression = PuffinCompressionCodec.NONE | |
for flag in self.decode_flags(footer, footer_struct_offset): | |
if flag == Flag.FOOTER_PAYLOAD_COMPRESSED: | |
footer_compression = PuffinFormat.FOOTER_COMPRESSION_CODEC | |
footer_payload_size = PuffinFormat.read_integer_little_endian( | |
footer, footer_struct_offset + PuffinFormat.FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET) | |
if footer_size != (PuffinFormat.FOOTER_START_MAGIC_LENGTH + footer_payload_size + PuffinFormat.FOOTER_STRUCT_LENGTH): | |
raise ValueError(f"Unexpected footer payload size value {footer_payload_size} for footer size {footer_size}") | |
footer_payload = footer[4:4 + footer_payload_size] | |
footer_json = PuffinFormat.decompress(footer_compression, footer_payload) | |
self.known_file_metadata = self.parse_file_metadata(footer_json) | |
return self.known_file_metadata | |
def decode_flags(self, footer: bytes, footer_struct_offset: int) -> set: | |
flags = set() | |
for byte_number in range(PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH): | |
flag_byte = footer[footer_struct_offset + PuffinFormat.FOOTER_STRUCT_FLAGS_OFFSET + byte_number] | |
bit_number = 0 | |
while flag_byte != 0: | |
if flag_byte & 0x1: | |
flag = Flag.from_bit(byte_number, bit_number) | |
if flag is None: | |
raise ValueError(f"Unknown flag byte {byte_number} and bit {bit_number} set") | |
flags.add(flag) | |
flag_byte >>= 1 | |
bit_number += 1 | |
return flags | |
def read_all(self, blobs: List['PuffinReader.BlobMetadata']) -> List[Tuple['PuffinReader.BlobMetadata', bytes]]: | |
if not blobs: | |
return [] | |
results = [] | |
blobs.sort(key=lambda b: b.offset) | |
for blob_metadata in blobs: | |
self.input.seek(blob_metadata.offset) | |
bytes_data = self.input.read(blob_metadata.length) | |
raw_data = bytes(bytes_data) # Convert to bytes | |
codec = PuffinCompressionCodec.for_name(blob_metadata.compression_codec) | |
data = PuffinFormat.decompress(codec, raw_data) | |
results.append((blob_metadata, data)) | |
return results | |
def check_magic(self, data: bytes, offset: int): | |
magic = PuffinFormat.get_magic() | |
read_magic = data[offset:offset + len(magic)] | |
if read_magic != magic: | |
raise ValueError(f"Invalid file: expected magic at offset {offset}: {magic}, but got {read_magic}") | |
def footer_size(self) -> int: | |
if self.known_footer_size is None: | |
if self.file_size < PuffinFormat.FOOTER_STRUCT_LENGTH: | |
raise ValueError(f"Invalid file: file length {self.file_size} is less than minimal length of the footer tail {PuffinFormat.FOOTER_STRUCT_LENGTH}") | |
self.input.seek(self.file_size - PuffinFormat.FOOTER_STRUCT_LENGTH) | |
footer_struct = self.input.read(PuffinFormat.FOOTER_STRUCT_LENGTH) | |
self.check_magic(footer_struct, PuffinFormat.FOOTER_STRUCT_MAGIC_OFFSET) | |
footer_payload_size = PuffinFormat.read_integer_little_endian(footer_struct, PuffinFormat.FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET) | |
self.known_footer_size = PuffinFormat.FOOTER_START_MAGIC_LENGTH + footer_payload_size + PuffinFormat.FOOTER_STRUCT_LENGTH | |
return self.known_footer_size | |
def parse_file_metadata(self, data: bytes) -> Dict[str, Any]: | |
footer_json = data.decode('utf-8') | |
return json.loads(footer_json) # Adjust this to return actual FileMetadata object if needed | |
def get_blobs(self) -> List['PuffinReader.BlobMetadata']: | |
metadata = self.file_metadata() | |
blobs = [] | |
for blob_info in metadata['blobs']: | |
blob = PuffinReader.BlobMetadata( | |
type=blob_info['type'], | |
input_fields=blob_info['fields'], | |
snapshot_id=blob_info['snapshot-id'], | |
sequence_number=blob_info['sequence-number'], | |
offset=blob_info['offset'], | |
length=blob_info['length'], | |
compression_codec=blob_info['compression-codec'], | |
properties=blob_info['properties'] | |
) | |
blobs.append(blob) | |
return blobs | |
def get_created_by(self) -> Optional[str]: | |
metadata = self.file_metadata() | |
return metadata.get('properties', {}).get('created-by', None) | |
def close(self): | |
self.input.close() | |
self.known_footer_size = None | |
self.known_file_metadata = None | |
def main(): | |
parser = argparse.ArgumentParser(description="Puffin File Reader") | |
parser.add_argument( | |
"--file", | |
type=str, | |
help="The path to the Puffin file to read.", | |
required=True, | |
) | |
parser.add_argument( | |
"--pretty", | |
action="store_true", | |
help="Pretty print the blob metadata.", | |
) | |
args = parser.parse_args() | |
file_name = args.file | |
with open(file_name, 'rb') as f: | |
input_file_content = f.read() | |
reader = PuffinReader(input_file_content) | |
blobs = reader.get_blobs() | |
created_by = reader.get_created_by() | |
if created_by: | |
logging.info(f"Created by: {created_by}") | |
blob_data = reader.read_all(blobs) | |
for blob, data in blob_data: | |
if args.pretty: | |
pprint.pprint(blob.__dict__) | |
else: | |
logging.info(f"Blob: {blob.__dict__}") | |
logging.debug(f"Data: {data}") | |
reader.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment