Skip to content

Instantly share code, notes, and snippets.

@moreati
Last active February 2, 2023 23:44
Show Gist options
  • Save moreati/97895490d7c69d13dde1e6e5114f069d to your computer and use it in GitHub Desktop.
Save moreati/97895490d7c69d13dde1e6e5114f069d to your computer and use it in GitHub Desktop.
Prototype Python implementation of `fsverity digest`
#!/usr/bin/env python3
'''
Compute fs-verity digest for the given file(s).
'''
import argparse
import enum
import functools
import hashlib
import itertools
import math
import struct
import sys
DEFAULT_BLOCK_SIZE = 4096
NUL = b'\0'
class FSVerityHashAlg(enum.IntEnum):
sha256 = 1
sha512 = 2
@classmethod
def from_str(cls, s):
return cls[s]
def new(self, data=b''):
return FSVerityBlock(data=data, alg=self.name)
@property
def digest_size(self):
return self.value * (256//8)
class FSVerityBlock:
def __init__(self, data=b'', alg='sha256', size=DEFAULT_BLOCK_SIZE):
self.data_size = 0
self.hash = hashlib.new(alg)
self.size = size
self.update(data)
def update(self, data):
assert (self.data_size + len(data)) <= self.size
self.data_size += len(data)
self.hash.update(data)
def digest(self):
if self.data_size == 0:
raise ValueError
h = self.hash.copy()
h.update(NUL * (self.size-self.data_size))
return h.digest()
def hexdigest(self):
return self.digest().hex()
@property
def digest_size(self):
return self.hash.digest_size
@property
def digests_capacity(self):
return self.size // self.digest_size
FSVerityDescriptor = struct.Struct('<BBBBIQ64s32s144s')
class FSVerityHash:
name = 'fsverity'
salt = b''
version = 1
def __init__(self, data=b'', alg='sha256', block_size=DEFAULT_BLOCK_SIZE):
self.algorithm = FSVerityHashAlg[alg]
self.block_size = block_size
self.data_size = 0
self.hashes = []
self.update(data)
@property
def digests_per_block(self):
return self.block_size // self.algorithm.digest_size
def update_(self, data):
if not data:
return
self.data_size += len(data)
assert self.data_size <= self.block_size
merkle_root = self.hashes[0]
merkle_root.update(data)
def update(self, data):
remaining = memoryview(data)
while remaining:
block_used = self.data_size % self.block_size
if block_used == 0:
self._update_hashes(remaining)
block_free = self.block_size - block_used
chunk, remaining = remaining[:block_free], remaining[block_free:]
self.hashes[-1].update(chunk)
self.data_size += len(chunk)
block_used += len(chunk)
assert block_used <= self.block_size
assert self.data_size <= 8 * 2**30
def _update_hashes(self, pending_data):
# These transitions must happen only at a block boundary
# when there is pending data
assert self.data_size % self.block_size == 0
assert pending_data
if self.data_size == 0:
assert self.hashes == []
self.hashes[:] = [
self.algorithm.new(),
]
elif self.data_size == self.block_size:
assert len(self.hashes) == 1
self.hashes[:] = [
self.algorithm.new(self.hashes[0].digest()),
self.algorithm.new(),
]
elif self.data_size == self.block_size * self.digests_per_block:
assert len(self.hashes) == 2
oldroot, oldleaf = self.hashes
oldroot.update(oldleaf.digest())
self.hashes[:] = [
self.algorithm.new(oldroot.digest()),
self.algorithm.new(),
self.algorithm.new(),
]
elif self.data_size == self.block_size * self.digests_per_block**2:
assert len(self.hashes) == 3
self.hashes[1].update(self.hashes[2].digest())
self.hashes[0].update(self.hashes[1].digest())
self.hashes[:] = [
self.algorithm.new(self.hashes[0].digest()),
self.algorithm.new(),
self.algorithm.new(),
self.algorithm.new(),
]
elif 0 == self.data_size % (self.block_size * self.digests_per_block**2):
assert len(self.hashes) >= 4, f'expected >= 4, got {len(self.hashes)}'
self.hashes[-2].update(self.hashes[-1].digest())
self.hashes[-3].update(self.hashes[-2].digest())
self.hashes[-4].update(self.hashes[-3].digest())
self.hashes[-3:] = [
self.algorithm.new(),
self.algorithm.new(),
self.algorithm.new(),
]
elif 0 == self.data_size % (self.block_size * self.digests_per_block):
assert len(self.hashes) >= 3, f'expected >= 3, got {len(self.hashes)}'
self.hashes[-2].update(self.hashes[-1].digest())
self.hashes[-3].update(self.hashes[-2].digest())
self.hashes[-2:] = [
self.algorithm.new(),
self.algorithm.new(),
]
elif 0 == self.data_size % self.block_size:
assert len(self.hashes) >= 2, f'expected >= 2, got {len(self.hashes)}'
self.hashes[-2].update(self.hashes[-1].digest())
self.hashes[-1:] = [
self.algorithm.new(),
]
def merkle_root_digest(self):
if self.data_size == 0:
assert self.hashes == []
return NUL * self.algorithm.digest_size
if self.data_size <= self.block_size:
assert len(self.hashes) == 1
root = self.hashes[0]
return root.digest()
if self.data_size <= self.block_size * self.digests_per_block:
assert len(self.hashes) == 2, f'expected 2, got {len(self.hashes)}'
root, leaf = self.hashes
root.update(leaf.digest())
return root.digest()
if self.data_size <= self.block_size * self.digests_per_block**2:
assert len(self.hashes) == 3, f'expected 3, got {len(self.hashes)}'
root, mid, leaf = self.hashes
mid.update(leaf.digest())
root.update(mid.digest())
return root.digest()
if self.data_size <= self.block_size * self.digests_per_block**3:
assert len(self.hashes) == 4, f'expected 4, got {len(self.hashes)}'
root, mid1, mid2, leaf = self.hashes
mid2.update(leaf.digest())
mid1.update(mid2.digest())
root.update(mid1.digest())
return root.digest()
raise ValueError(f'{self.data_size} not supported yet')
def digest(self):
descriptor = FSVerityDescriptor.pack(
self.version,
self.algorithm.value,
math.ceil(math.log2(self.block_size)),
len(self.salt),
0, # reserved
self.data_size,
self.merkle_root_digest(),
self.salt,
b'', # reserved
)
return hashlib.new(self.algorithm.name, descriptor).digest()
def hexdigest(self):
return self.digest().hex()
def pad(s, size, fill=NUL):
r'''
>>> pad(b'abc', 5)
b'abc\x00\x00'
'''
unpadded_size = len(s)
if unpadded_size < size:
s += fill * (size - unpadded_size)
return s
def blocks(file, size=DEFAULT_BLOCK_SIZE):
r'''
>>> file = io.BytesIO(b'abcdefghijklm')
>>> list(blocks(file, size=5))
[b'abcde', b'fghij', b'klm']
'''
while block := file.read(size):
yield block
def blocks_padded(file, size=DEFAULT_BLOCK_SIZE, fill=NUL):
r'''
>>> file = io.BytesIO(b'abcdefghijklm')
>>> list(blocks_padded(file, size=5))
[(5, b'abcde'), (5, b'fghij'), (3, b'klm\0\0')]
'''
for block in blocks(file, size):
unpadded_size = len(block)
yield (unpadded_size, pad(block, size, fill))
# https://docs.python.org/3/library/itertools.html#itertools-recipes
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(itertools.islice(iterable, n))
# https://more-itertools.readthedocs.io/en/stable/_modules/more_itertools/more.html#chunked
def chunked(iterable, n, strict=False):
"""Break *iterable* into lists of length *n*:
>>> list(chunked([1, 2, 3, 4, 5, 6], 3))
[[1, 2, 3], [4, 5, 6]]
By the default, the last yielded list will have fewer than *n* elements
if the length of *iterable* is not divisible by *n*:
>>> list(chunked([1, 2, 3, 4, 5, 6, 7, 8], 3))
[[1, 2, 3], [4, 5, 6], [7, 8]]
To use a fill-in value instead, see the :func:`grouper` recipe.
If the length of *iterable* is not divisible by *n* and *strict* is
``True``, then ``ValueError`` will be raised before the last
list is yielded.
"""
iterator = iter(functools.partial(take, n, iter(iterable)), [])
if strict:
if n is None:
raise ValueError('n must not be None when using strict mode.')
def ret():
for chunk in iterator:
if len(chunk) != n:
raise ValueError('iterable is not divisible by n.')
yield chunk
return iter(ret())
else:
return iterator
def merkle_tree(file, algorithm='sha256', block_size=DEFAULT_BLOCK_SIZE):
algorithm = FSVerityHashAlg[algorithm]
digests = []
data_size = 0
for block in blocks(file):
fsvblock = FSVerityBlock(block, algorithm.name, block_size)
digests.append(fsvblock.digest())
data_size += len(block)
# | Input size | Tree |
# | 0 | [[nul_digest]] |
# | 1 - 4096 | [[block_digest]] |
# | 4097 - 4096*128 | [[digest], [block_digest_0, .. block_digest_128]] |
# | 4096*128+1 - | [[digest], [digest_0 .. digest_128], [..., ..., ...]] |
if data_size == 0:
assert digests == []
return (data_size, [])
if data_size <= block_size:
assert len(digests) == 1
return (data_size, [digests])
digests_per_block = block_size // algorithm.digest_size
layers_count = math.ceil(math.log(len(digests), digests_per_block)) + 1
tree = [[] for _ in range(layers_count - 1)]
tree.append(digests)
for depth, layer in reversed(list(enumerate(tree[1:], 1))):
for chunk in chunked(layer, digests_per_block):
block = b''.join(chunk)
fsvblock = FSVerityBlock(block, algorithm.name, block_size)
tree[depth-1].append(fsvblock.digest())
return (data_size, tree)
def merkle_root_digest(data_size, tree, algorithm='sha256'):
algorithm = FSVerityHashAlg[algorithm]
if data_size == 0:
assert tree == []
return NUL * algorithm.digest_size
return tree[0][0]
parser = argparse.ArgumentParser(description=__doc__)
#parser.add_argument('--block-size', metavar='N', type=int, default=DEFAULT_BLOCK_SIZE)
#parser.add_argument('--compact', action='store_true')
#parser.add_argument('--hash-alg', default='sha256', choices=tuple(e.name for e in FSVerityHashAlg), type=FSVerityHashAlg.from_str)
parser.add_argument(
'files', metavar='FILE',
type=argparse.FileType('rb'), default=[sys.stdin.buffer], nargs='*',
help='Input file(s) to process (default: stdin)',
)
args = parser.parse_args()
# https://www.kernel.org/doc/html/latest/filesystems/fsverity.html#fs-verity-descriptor
'''
struct fsverity_descriptor {
__u8 version; /* must be 1 */
__u8 hash_algorithm; /* Merkle tree hash algorithm */
__u8 log_DEFAULT_BLOCK_SIZE; /* log2 of size of data and tree blocks */
__u8 salt_size; /* size of salt in bytes; 0 if none */
__le32 __reserved_0x04; /* must be 0 */
__le64 data_size; /* size of file the Merkle tree is built over */
__u8 root_hash[64]; /* Merkle tree root hash */
__u8 salt[32]; /* salt prepended to each hashed block */
__u8 __reserved[144]; /* must be 0's */
};
'''
fsverity_descriptor = struct.Struct('<BBBBIQ64s32s144s')
for file in args.files:
with file as f:
h = FSVerityHash(f.read())
print(f'sha256:{h.digest().hex()} {f.name}')
continue
size, tree = merkle_tree(f)
root_digest = merkle_root_digest(size, tree)
fsverity_digest = hashlib.sha256(fsverity_descriptor.pack(
1,
FSVerityHashAlg.sha256.value,
math.ceil(math.log2(DEFAULT_BLOCK_SIZE)),
0,
0,
size,
root_digest,
b'',
b'',
)).digest()
print(f'sha256:{fsverity_digest.hex()} {f.name}')
sha256:3d248ca542a24fc62d1c43b916eae5016878e2533c88238480b26128a1f1af95 tests/data/0_000_000_000_bytes
sha256:9845e616f7d2f7a1cd6742f0546a36d2e74d4eb8ae7d9bdc0b0df982c27861b7 tests/data/0_000_000_001_bytes
sha256:e40425eaca55b3aca9994575b03b1585ff756c4684395fa144ee2642aeaf1d49 tests/data/0_000_000_100_bytes
sha256:d67826aecb67a64705ac70f82cd0878394be2c1e19bdced9b0aad653fa2e73ae tests/data/0_000_001_000_bytes
sha256:aab040052277ce2baa6de148f1b3c8798de72f760d019427f44ce7caac6a69ef tests/data/0_000_004_095_bytes
sha256:3fd7a78101899a79cd337b1b4e5414be8bcb376b133370156ef6e65026d930ed tests/data/0_000_004_096_bytes
sha256:7319937c445f61df1a8f310eb4d6da157618d80713972a78d70a7528fc1a6f32 tests/data/0_000_004_097_bytes
sha256:1998a5f553ea5cb29aec221561290c29eca2f30d7ed2be832745ec7ddc73c5a8 tests/data/0_000_008_191_bytes
sha256:8bddf40a09fef722e6f2b141d69ab2cc23010aa824a78186d38abb05cb8b47f5 tests/data/0_000_008_192_bytes
sha256:d45adc8817b342124448dcd0a06184aa2df5a919eec7ab900bca0c14bccd129c tests/data/0_000_008_193_bytes
sha256:5c00a54bd1d8341d7bbad060ff1b8e88ed2646d7bb38db6e752cd1cff66c0a78 tests/data/0_000_524_287_bytes
sha256:f5c2b9ded1595acfe8a996795264d488dd6140531f6a01f8f8086a83fd835935 tests/data/0_000_524_288_bytes
sha256:a7abb76568871169a79104d00679fae6521dfdb2a2648e380c02b10e96e217ff tests/data/0_000_524_289_bytes
sha256:04b74498f0fafcd91622f0b218f3f1a024221e7ccb1732ea180cd4833808edd6 tests/data/0_001_234_567_bytes
sha256:91634eb57e4b47f077b18c596cdae9ebf60bbded408f19423ccd5703028e95a4 tests/data/0_067_108_863_bytes
sha256:2dbb93b12267f696804876449e4874e543ad7fbf5715dbf6ff5a277553d9edfe tests/data/0_067_108_864_bytes
sha256:d0d2d7311c25c6d121011a569274bec0929afe1ca56543cb1235d5056efc4f7b tests/data/0_067_108_865_bytes
sha256:94d063568b3969e63306f5eedeae760702d8f64fb75b4eb4de2e7df999662997 tests/data/0_123_456_789_bytes
sha256:1aa9222fd2be3b557708e07ab6bee8ae29012e86f8a9b05872302f2e9286e918 tests/data/8_589_934_592_bytes
#!/usr/bin/env python3
import pathlib
import sys
KIB = 2**10
MIB = 2**20
GIB = 2**30
BUF = b'A' * MIB
SIZES = [
# 0 layers, Merkle root hash is all NUL
0,
# 1 layer, 0 B < N <= 4096 bytes, Markle root hash is hash of only block
1, 100, 1000, 4095, 4096,
# 2 layers, 4 kiB < N <= 512 kiB
4097, 8191, 8192, 8193, 524287, 524288,
# 3 layers, 512 kiB < N <= 64 MiB
524289, 1234567, 64*MIB-1, 64*MIB,
# 4 layers, 64 MiB < N <= 8 GiB
64*MIB+1, 123456789, 8*GIB,
]
dest_dir = pathlib.Path('tests/data')
dest_dir.mkdir(parents=True, exist_ok=True)
for size in SIZES:
with open(dest_dir / f'{size:012_}_bytes', 'wb') as f:
while size > 0:
f.write(BUF[:size])
size -= len(BUF)
@moreati
Copy link
Author

moreati commented Jan 29, 2023

$ fsverity digest empty 4kibzero 8kibzero 10kibzero 
sha256:3d248ca542a24fc62d1c43b916eae5016878e2533c88238480b26128a1f1af95 empty
sha256:babc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e 4kibzero
sha256:be54121da3877f8852c65136d731784f134c4dd9d95071502e80d7be9f99b263 8kibzero
sha256:5272a91444eff2df8873e096e453e13063c3518ad7b8517408c00d433bda413d 10kibzero
./fsverity_digest empty 4kibzero 8kibzero 10kibzero   
sha256:3d248ca542a24fc62d1c43b916eae5016878e2533c88238480b26128a1f1af95 empty
sha256:babc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e 4kibzero
sha256:be54121da3877f8852c65136d731784f134c4dd9d95071502e80d7be9f99b263 8kibzero
sha256:5272a91444eff2df8873e096e453e13063c3518ad7b8517408c00d433bda413d 10kibzero

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment