Last active
February 2, 2023 23:44
-
-
Save moreati/97895490d7c69d13dde1e6e5114f069d to your computer and use it in GitHub Desktop.
Prototype Python implementation of `fsverity digest`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Compute fs-verity digest for the given file(s). | |
''' | |
import argparse | |
import enum | |
import functools | |
import hashlib | |
import itertools | |
import math | |
import struct | |
import sys | |
DEFAULT_BLOCK_SIZE = 4096 | |
NUL = b'\0' | |
class FSVerityHashAlg(enum.IntEnum): | |
sha256 = 1 | |
sha512 = 2 | |
@classmethod | |
def from_str(cls, s): | |
return cls[s] | |
def new(self, data=b''): | |
return FSVerityBlock(data=data, alg=self.name) | |
@property | |
def digest_size(self): | |
return self.value * (256//8) | |
class FSVerityBlock: | |
def __init__(self, data=b'', alg='sha256', size=DEFAULT_BLOCK_SIZE): | |
self.data_size = 0 | |
self.hash = hashlib.new(alg) | |
self.size = size | |
self.update(data) | |
def update(self, data): | |
assert (self.data_size + len(data)) <= self.size | |
self.data_size += len(data) | |
self.hash.update(data) | |
def digest(self): | |
if self.data_size == 0: | |
raise ValueError | |
h = self.hash.copy() | |
h.update(NUL * (self.size-self.data_size)) | |
return h.digest() | |
def hexdigest(self): | |
return self.digest().hex() | |
@property | |
def digest_size(self): | |
return self.hash.digest_size | |
@property | |
def digests_capacity(self): | |
return self.size // self.digest_size | |
FSVerityDescriptor = struct.Struct('<BBBBIQ64s32s144s') | |
class FSVerityHash: | |
name = 'fsverity' | |
salt = b'' | |
version = 1 | |
def __init__(self, data=b'', alg='sha256', block_size=DEFAULT_BLOCK_SIZE): | |
self.algorithm = FSVerityHashAlg[alg] | |
self.block_size = block_size | |
self.data_size = 0 | |
self.hashes = [] | |
self.update(data) | |
@property | |
def digests_per_block(self): | |
return self.block_size // self.algorithm.digest_size | |
def update_(self, data): | |
if not data: | |
return | |
self.data_size += len(data) | |
assert self.data_size <= self.block_size | |
merkle_root = self.hashes[0] | |
merkle_root.update(data) | |
def update(self, data): | |
remaining = memoryview(data) | |
while remaining: | |
block_used = self.data_size % self.block_size | |
if block_used == 0: | |
self._update_hashes(remaining) | |
block_free = self.block_size - block_used | |
chunk, remaining = remaining[:block_free], remaining[block_free:] | |
self.hashes[-1].update(chunk) | |
self.data_size += len(chunk) | |
block_used += len(chunk) | |
assert block_used <= self.block_size | |
assert self.data_size <= 8 * 2**30 | |
def _update_hashes(self, pending_data): | |
# These transitions must happen only at a block boundary | |
# when there is pending data | |
assert self.data_size % self.block_size == 0 | |
assert pending_data | |
if self.data_size == 0: | |
assert self.hashes == [] | |
self.hashes[:] = [ | |
self.algorithm.new(), | |
] | |
elif self.data_size == self.block_size: | |
assert len(self.hashes) == 1 | |
self.hashes[:] = [ | |
self.algorithm.new(self.hashes[0].digest()), | |
self.algorithm.new(), | |
] | |
elif self.data_size == self.block_size * self.digests_per_block: | |
assert len(self.hashes) == 2 | |
oldroot, oldleaf = self.hashes | |
oldroot.update(oldleaf.digest()) | |
self.hashes[:] = [ | |
self.algorithm.new(oldroot.digest()), | |
self.algorithm.new(), | |
self.algorithm.new(), | |
] | |
elif self.data_size == self.block_size * self.digests_per_block**2: | |
assert len(self.hashes) == 3 | |
self.hashes[1].update(self.hashes[2].digest()) | |
self.hashes[0].update(self.hashes[1].digest()) | |
self.hashes[:] = [ | |
self.algorithm.new(self.hashes[0].digest()), | |
self.algorithm.new(), | |
self.algorithm.new(), | |
self.algorithm.new(), | |
] | |
elif 0 == self.data_size % (self.block_size * self.digests_per_block**2): | |
assert len(self.hashes) >= 4, f'expected >= 4, got {len(self.hashes)}' | |
self.hashes[-2].update(self.hashes[-1].digest()) | |
self.hashes[-3].update(self.hashes[-2].digest()) | |
self.hashes[-4].update(self.hashes[-3].digest()) | |
self.hashes[-3:] = [ | |
self.algorithm.new(), | |
self.algorithm.new(), | |
self.algorithm.new(), | |
] | |
elif 0 == self.data_size % (self.block_size * self.digests_per_block): | |
assert len(self.hashes) >= 3, f'expected >= 3, got {len(self.hashes)}' | |
self.hashes[-2].update(self.hashes[-1].digest()) | |
self.hashes[-3].update(self.hashes[-2].digest()) | |
self.hashes[-2:] = [ | |
self.algorithm.new(), | |
self.algorithm.new(), | |
] | |
elif 0 == self.data_size % self.block_size: | |
assert len(self.hashes) >= 2, f'expected >= 2, got {len(self.hashes)}' | |
self.hashes[-2].update(self.hashes[-1].digest()) | |
self.hashes[-1:] = [ | |
self.algorithm.new(), | |
] | |
def merkle_root_digest(self): | |
if self.data_size == 0: | |
assert self.hashes == [] | |
return NUL * self.algorithm.digest_size | |
if self.data_size <= self.block_size: | |
assert len(self.hashes) == 1 | |
root = self.hashes[0] | |
return root.digest() | |
if self.data_size <= self.block_size * self.digests_per_block: | |
assert len(self.hashes) == 2, f'expected 2, got {len(self.hashes)}' | |
root, leaf = self.hashes | |
root.update(leaf.digest()) | |
return root.digest() | |
if self.data_size <= self.block_size * self.digests_per_block**2: | |
assert len(self.hashes) == 3, f'expected 3, got {len(self.hashes)}' | |
root, mid, leaf = self.hashes | |
mid.update(leaf.digest()) | |
root.update(mid.digest()) | |
return root.digest() | |
if self.data_size <= self.block_size * self.digests_per_block**3: | |
assert len(self.hashes) == 4, f'expected 4, got {len(self.hashes)}' | |
root, mid1, mid2, leaf = self.hashes | |
mid2.update(leaf.digest()) | |
mid1.update(mid2.digest()) | |
root.update(mid1.digest()) | |
return root.digest() | |
raise ValueError(f'{self.data_size} not supported yet') | |
def digest(self): | |
descriptor = FSVerityDescriptor.pack( | |
self.version, | |
self.algorithm.value, | |
math.ceil(math.log2(self.block_size)), | |
len(self.salt), | |
0, # reserved | |
self.data_size, | |
self.merkle_root_digest(), | |
self.salt, | |
b'', # reserved | |
) | |
return hashlib.new(self.algorithm.name, descriptor).digest() | |
def hexdigest(self): | |
return self.digest().hex() | |
def pad(s, size, fill=NUL): | |
r''' | |
>>> pad(b'abc', 5) | |
b'abc\x00\x00' | |
''' | |
unpadded_size = len(s) | |
if unpadded_size < size: | |
s += fill * (size - unpadded_size) | |
return s | |
def blocks(file, size=DEFAULT_BLOCK_SIZE): | |
r''' | |
>>> file = io.BytesIO(b'abcdefghijklm') | |
>>> list(blocks(file, size=5)) | |
[b'abcde', b'fghij', b'klm'] | |
''' | |
while block := file.read(size): | |
yield block | |
def blocks_padded(file, size=DEFAULT_BLOCK_SIZE, fill=NUL): | |
r''' | |
>>> file = io.BytesIO(b'abcdefghijklm') | |
>>> list(blocks_padded(file, size=5)) | |
[(5, b'abcde'), (5, b'fghij'), (3, b'klm\0\0')] | |
''' | |
for block in blocks(file, size): | |
unpadded_size = len(block) | |
yield (unpadded_size, pad(block, size, fill)) | |
# https://docs.python.org/3/library/itertools.html#itertools-recipes | |
def take(n, iterable): | |
"Return first n items of the iterable as a list" | |
return list(itertools.islice(iterable, n)) | |
# https://more-itertools.readthedocs.io/en/stable/_modules/more_itertools/more.html#chunked | |
def chunked(iterable, n, strict=False): | |
"""Break *iterable* into lists of length *n*: | |
>>> list(chunked([1, 2, 3, 4, 5, 6], 3)) | |
[[1, 2, 3], [4, 5, 6]] | |
By the default, the last yielded list will have fewer than *n* elements | |
if the length of *iterable* is not divisible by *n*: | |
>>> list(chunked([1, 2, 3, 4, 5, 6, 7, 8], 3)) | |
[[1, 2, 3], [4, 5, 6], [7, 8]] | |
To use a fill-in value instead, see the :func:`grouper` recipe. | |
If the length of *iterable* is not divisible by *n* and *strict* is | |
``True``, then ``ValueError`` will be raised before the last | |
list is yielded. | |
""" | |
iterator = iter(functools.partial(take, n, iter(iterable)), []) | |
if strict: | |
if n is None: | |
raise ValueError('n must not be None when using strict mode.') | |
def ret(): | |
for chunk in iterator: | |
if len(chunk) != n: | |
raise ValueError('iterable is not divisible by n.') | |
yield chunk | |
return iter(ret()) | |
else: | |
return iterator | |
def merkle_tree(file, algorithm='sha256', block_size=DEFAULT_BLOCK_SIZE): | |
algorithm = FSVerityHashAlg[algorithm] | |
digests = [] | |
data_size = 0 | |
for block in blocks(file): | |
fsvblock = FSVerityBlock(block, algorithm.name, block_size) | |
digests.append(fsvblock.digest()) | |
data_size += len(block) | |
# | Input size | Tree | | |
# | 0 | [[nul_digest]] | | |
# | 1 - 4096 | [[block_digest]] | | |
# | 4097 - 4096*128 | [[digest], [block_digest_0, .. block_digest_128]] | | |
# | 4096*128+1 - | [[digest], [digest_0 .. digest_128], [..., ..., ...]] | | |
if data_size == 0: | |
assert digests == [] | |
return (data_size, []) | |
if data_size <= block_size: | |
assert len(digests) == 1 | |
return (data_size, [digests]) | |
digests_per_block = block_size // algorithm.digest_size | |
layers_count = math.ceil(math.log(len(digests), digests_per_block)) + 1 | |
tree = [[] for _ in range(layers_count - 1)] | |
tree.append(digests) | |
for depth, layer in reversed(list(enumerate(tree[1:], 1))): | |
for chunk in chunked(layer, digests_per_block): | |
block = b''.join(chunk) | |
fsvblock = FSVerityBlock(block, algorithm.name, block_size) | |
tree[depth-1].append(fsvblock.digest()) | |
return (data_size, tree) | |
def merkle_root_digest(data_size, tree, algorithm='sha256'): | |
algorithm = FSVerityHashAlg[algorithm] | |
if data_size == 0: | |
assert tree == [] | |
return NUL * algorithm.digest_size | |
return tree[0][0] | |
parser = argparse.ArgumentParser(description=__doc__) | |
#parser.add_argument('--block-size', metavar='N', type=int, default=DEFAULT_BLOCK_SIZE) | |
#parser.add_argument('--compact', action='store_true') | |
#parser.add_argument('--hash-alg', default='sha256', choices=tuple(e.name for e in FSVerityHashAlg), type=FSVerityHashAlg.from_str) | |
parser.add_argument( | |
'files', metavar='FILE', | |
type=argparse.FileType('rb'), default=[sys.stdin.buffer], nargs='*', | |
help='Input file(s) to process (default: stdin)', | |
) | |
args = parser.parse_args() | |
# https://www.kernel.org/doc/html/latest/filesystems/fsverity.html#fs-verity-descriptor | |
''' | |
struct fsverity_descriptor { | |
__u8 version; /* must be 1 */ | |
__u8 hash_algorithm; /* Merkle tree hash algorithm */ | |
__u8 log_DEFAULT_BLOCK_SIZE; /* log2 of size of data and tree blocks */ | |
__u8 salt_size; /* size of salt in bytes; 0 if none */ | |
__le32 __reserved_0x04; /* must be 0 */ | |
__le64 data_size; /* size of file the Merkle tree is built over */ | |
__u8 root_hash[64]; /* Merkle tree root hash */ | |
__u8 salt[32]; /* salt prepended to each hashed block */ | |
__u8 __reserved[144]; /* must be 0's */ | |
}; | |
''' | |
fsverity_descriptor = struct.Struct('<BBBBIQ64s32s144s') | |
for file in args.files: | |
with file as f: | |
h = FSVerityHash(f.read()) | |
print(f'sha256:{h.digest().hex()} {f.name}') | |
continue | |
size, tree = merkle_tree(f) | |
root_digest = merkle_root_digest(size, tree) | |
fsverity_digest = hashlib.sha256(fsverity_descriptor.pack( | |
1, | |
FSVerityHashAlg.sha256.value, | |
math.ceil(math.log2(DEFAULT_BLOCK_SIZE)), | |
0, | |
0, | |
size, | |
root_digest, | |
b'', | |
b'', | |
)).digest() | |
print(f'sha256:{fsverity_digest.hex()} {f.name}') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sha256:3d248ca542a24fc62d1c43b916eae5016878e2533c88238480b26128a1f1af95 tests/data/0_000_000_000_bytes | |
sha256:9845e616f7d2f7a1cd6742f0546a36d2e74d4eb8ae7d9bdc0b0df982c27861b7 tests/data/0_000_000_001_bytes | |
sha256:e40425eaca55b3aca9994575b03b1585ff756c4684395fa144ee2642aeaf1d49 tests/data/0_000_000_100_bytes | |
sha256:d67826aecb67a64705ac70f82cd0878394be2c1e19bdced9b0aad653fa2e73ae tests/data/0_000_001_000_bytes | |
sha256:aab040052277ce2baa6de148f1b3c8798de72f760d019427f44ce7caac6a69ef tests/data/0_000_004_095_bytes | |
sha256:3fd7a78101899a79cd337b1b4e5414be8bcb376b133370156ef6e65026d930ed tests/data/0_000_004_096_bytes | |
sha256:7319937c445f61df1a8f310eb4d6da157618d80713972a78d70a7528fc1a6f32 tests/data/0_000_004_097_bytes | |
sha256:1998a5f553ea5cb29aec221561290c29eca2f30d7ed2be832745ec7ddc73c5a8 tests/data/0_000_008_191_bytes | |
sha256:8bddf40a09fef722e6f2b141d69ab2cc23010aa824a78186d38abb05cb8b47f5 tests/data/0_000_008_192_bytes | |
sha256:d45adc8817b342124448dcd0a06184aa2df5a919eec7ab900bca0c14bccd129c tests/data/0_000_008_193_bytes | |
sha256:5c00a54bd1d8341d7bbad060ff1b8e88ed2646d7bb38db6e752cd1cff66c0a78 tests/data/0_000_524_287_bytes | |
sha256:f5c2b9ded1595acfe8a996795264d488dd6140531f6a01f8f8086a83fd835935 tests/data/0_000_524_288_bytes | |
sha256:a7abb76568871169a79104d00679fae6521dfdb2a2648e380c02b10e96e217ff tests/data/0_000_524_289_bytes | |
sha256:04b74498f0fafcd91622f0b218f3f1a024221e7ccb1732ea180cd4833808edd6 tests/data/0_001_234_567_bytes | |
sha256:91634eb57e4b47f077b18c596cdae9ebf60bbded408f19423ccd5703028e95a4 tests/data/0_067_108_863_bytes | |
sha256:2dbb93b12267f696804876449e4874e543ad7fbf5715dbf6ff5a277553d9edfe tests/data/0_067_108_864_bytes | |
sha256:d0d2d7311c25c6d121011a569274bec0929afe1ca56543cb1235d5056efc4f7b tests/data/0_067_108_865_bytes | |
sha256:94d063568b3969e63306f5eedeae760702d8f64fb75b4eb4de2e7df999662997 tests/data/0_123_456_789_bytes | |
sha256:1aa9222fd2be3b557708e07ab6bee8ae29012e86f8a9b05872302f2e9286e918 tests/data/8_589_934_592_bytes |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import pathlib | |
import sys | |
KIB = 2**10 | |
MIB = 2**20 | |
GIB = 2**30 | |
BUF = b'A' * MIB | |
SIZES = [ | |
# 0 layers, Merkle root hash is all NUL | |
0, | |
# 1 layer, 0 B < N <= 4096 bytes, Markle root hash is hash of only block | |
1, 100, 1000, 4095, 4096, | |
# 2 layers, 4 kiB < N <= 512 kiB | |
4097, 8191, 8192, 8193, 524287, 524288, | |
# 3 layers, 512 kiB < N <= 64 MiB | |
524289, 1234567, 64*MIB-1, 64*MIB, | |
# 4 layers, 64 MiB < N <= 8 GiB | |
64*MIB+1, 123456789, 8*GIB, | |
] | |
dest_dir = pathlib.Path('tests/data') | |
dest_dir.mkdir(parents=True, exist_ok=True) | |
for size in SIZES: | |
with open(dest_dir / f'{size:012_}_bytes', 'wb') as f: | |
while size > 0: | |
f.write(BUF[:size]) | |
size -= len(BUF) |
Author
moreati
commented
Jan 29, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment