Created
April 4, 2024 00:58
-
-
Save mattfysh/9177a229ed06de9043bfb963e24c4097 to your computer and use it in GitHub Desktop.
read delta table as a stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
from typing import Any | |
SKIP_OPERATIONS = ["OPTIMIZE", "VACUUM START", "VACUUM END"] | |
def is_subset(a: dict[str, Any], b: dict[str, Any]): | |
sub = a.items() | |
full = b.items() | |
return all(item in full for item in sub) | |
def file_version(filename: str): | |
return int(filename.split(".")[0]) | |
def get_incremental_files_for_partition( | |
partition_values: dict[str, str], table_path: str, cursor: int | |
): | |
log_path = f"{table_path}/_delta_log" | |
commits = sorted( | |
[file for file in os.listdir(log_path) if file.endswith(".json")] | |
) | |
if len(commits) == 0: | |
return None | |
start_version = file_version(commits[0]) | |
end_version = file_version(commits[-1]) | |
entry_expected = end_version >= cursor | |
cursor_index = cursor - start_version | |
commits = commits[cursor_index:] | |
if start_version > cursor: | |
raise Exception(f"Version {cursor} preceeds the earliest commit") | |
elif entry_expected and len(commits) == 0: | |
raise Exception( | |
f"Version {cursor} could not be located at expected index, are there entries missing from the transaction log?" | |
) | |
files = [] | |
next_cursor = cursor | |
for commit in commits: | |
version = file_version(commit) | |
commit_filepath = os.path.join(log_path, commit) | |
with open(commit_filepath, "r") as file: | |
lines = file.read().splitlines() | |
commit_info = json.loads(lines[-1])["commitInfo"] | |
operation = commit_info["operation"] | |
if operation in SKIP_OPERATIONS: | |
continue | |
is_append_only = operation == "CREATE TABLE" or ( | |
operation == "WRITE" | |
and commit_info["operationParameters"]["mode"] == "Append" | |
) | |
actions = lines[:-1] | |
for line in actions: | |
(action, spec), *_ = json.loads(line).items() | |
if action in ["add", "remove"]: | |
if not is_subset(partition_values, spec["partitionValues"]): | |
continue | |
elif action == "add" and is_append_only: | |
new_file = os.path.join(table_path, spec["path"]) | |
files.append(new_file) | |
else: | |
raise Exception( | |
f"Found action for partition within incompatible commit operation '{operation}'" | |
) | |
elif action in ["protocol", "metaData"]: | |
continue | |
else: | |
raise Exception(f"Not implemented: {action}") | |
next_cursor = version + 1 | |
return files, next_cursor |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment