Skip to content

Instantly share code, notes, and snippets.

@fillet54
Last active February 5, 2025 20:10
Show Gist options
  • Save fillet54/185fcb9b365327fd1c1b23011fd662bc to your computer and use it in GitHub Desktop.
Save fillet54/185fcb9b365327fd1c1b23011fd662bc to your computer and use it in GitHub Desktop.
import os
import hashlib
import fnmatch
from pathlib import Path
from collections import namedtuple
def calculate_sha1(path, chunksize=8192):
"""Calculate sha1 hexdigest of file
Reads file in 'chunksize' chunks (for efficiency) to calculate a sha1 hash
of a file.
Parameters:
- path (str or Path): The path to the file whose SHA1 hash is to be calculated.
- chunksize (int, optional): The size, in bytes, of each chunk read from the file.
[default: 8192 bytes].
Returns:
sha1 hexdigest of the files provided at the path
"""
path = Path(path)
assert path.is_file()
sha1 = hashlib.sha1()
with path.open('rb') as f:
while chunk := f.read(chunksize):
sha1.update(chunk)
return sha1.hexdigest()
Fingerprint = namedtuple("Fingerprint", "type mode mtime ctime uid gid ino size")
def file_fingerprint(path):
"""Fingerprint file on disk
Produces a unique fingerprint for a file on disk. Used
to determine if a file has changed since last fingerprinted.
A fingerprint is used to determine if a file has possibly changed since
it was last inspected. Typically this is done for efficiently
purposes to determine if a more expensive operation (generally hashing) is
required to determine the file's current state on disk.
Parameters:
- path (str or Path): THe path to the file to be fingerprinted
Returns:
Fingerprint tuple of file
"""
path = Path(path)
assert path.is_file()
stat = path.stat()
return Fingerprint(
"link" if os.path.islink(path) else "file",
stat.st_mode & 0o111, # Excecute only
stat.st_mtime,
stat.st_ctime,
stat.st_uid,
stat.st_gid,
stat.st_ino,
stat.st_size
)
def match_gitignore_pattern(path, patterns):
"""Check if a given path matches any of the ignore patterns."""
for pattern in patterns:
if fnmatch.fnmatch(path, pattern):
return True
return False
def is_ignored_directory(path, patterns):
"""Check if directory matches any of the ignore patterns
A directory is ignore if its name matches a pattern or if
the full path matches a pattern. The name and and path must be
prepended with a "/" for the patterns to match
Returns:
True if path matches any pattern else False
"""
return (
match_gitignore_pattern(f"{path.name}/", patterns)
or match_gitignore_pattern(f"{path}/", patterns)
)
def filter_files(root_path, patterns):
"""Filter files based on .gitignore patterns.
Yields file paths relative to the root_path that do not match any of the
pass in ignore patterns. Currently does not support "!" logic
"""
root_path = Path(root_path)
for dirpath, dirnames, filenames in os.walk(root_path):
root = Path(dirpath).relative_to(root_path)
# prevent traversal of any ignored directories
dirnames[:] = [d
for d in dirnames
if not is_ignored_directory(root / d, patterns)]
for filename in filenames:
if not match_gitignore_pattern(filename, patterns):
yield root / filename
class IntegrityIndex:
"""Filesystem Integrity Index
Records an index to keep track of a working directory contents. The index
works much like a git index which can determine differences between the working
directory and the last commit. Indended to be used to ensure a working directory
contents matches exactly a saved index.
By default the index files is stored in the root of the worktree as ".fscache.index". The
index path can be provided to override this in cases such as storing an index in a git repo
to ensure some software is installed properly when running a build.
Additionally a .gitignore like list of patterns can be provide to filter files to not be
added to an index.
Main methods provided are update and diff.
"""
MAGIC = "INTIDX"
VERSION = "1"
def __init__(self, worktree_path, index_path=None, ignore_patterns=None):
self.worktree_path = Path(worktree_path)
if index_path is None:
self.index_path = self.worktree_path / ".fscache.index"
else:
self.index_path = Path(index_path)
self.ignore_patterns = ignore_patterns or []
self.ignore_patterns.append(".fscache.index")
def generate(self):
index, *_ = self.diff()
return index
def update(self):
index = self.generate()
self.write(index)
def diff(self):
"""Diff an index"""
index = self.read()
added = []
modified = []
deleted = set(index.keys())
for path in filter_files(self.worktree_path, self.ignore_patterns):
deleted.discard(path)
is_fingerprint = file_fingerprint(path)
# continue on if fingerprint matches
if path in index:
was_sha1, was_fingerprint = index[path]
# fingerprint matches we can assume file is not changed
if is_fingerprint == was_fingerprint:
print("SAME", path)
continue
# Fingerprint is different. Need to recalc hash
is_sha1 = calculate_sha1(path)
if was_sha1 != is_sha1:
modified.append(path)
else:
added.append(path)
is_sha1 = calculate_sha1(self.worktree_path / path)
index[path] = (is_sha1, is_fingerprint)
# remove from the index anything that is no longer there
index = {k:v for k,v in index.items() if k not in deleted}
return index, added, modified, list(deleted)
def write(self, index):
print(index)
lines = [self.MAGIC, self.VERSION]
for path in index:
sha1, fp = index[path]
fields = [
fp.type,
f"{fp.mode:03o}",
f"{fp.mtime:>20}",
f"{fp.ctime:>20}",
fp.uid,
fp.gid,
fp.ino,
f"{fp.size:<12}",
sha1,
path
]
lines.append(" ".join(str(field) for field in fields))
self.index_path.write_text("\n".join(lines))
def read(self, index_path=None):
if index_path is None:
index_path = self.index_path
else:
index_path = Path(index_path)
if not index_path.exists():
return {}
magic, version, *lines = index_path.read_text().splitlines()
assert magic == self.MAGIC
assert version == self.VERSION # only support current version
index = {}
for line in lines:
_type, mode, mtime, ctime, uid, gid, ino, size, sha1, pathstr = line.split(maxsplit=9)
fingerprint = Fingerprint(
_type,
int(mode, 8),
float(mtime),
float(ctime),
int(uid),
int(gid),
int(ino),
int(size)
)
index[Path(pathstr)] = (sha1, fingerprint)
return index
def verify_tree(self, was_index_path):
"""Compares the trees within an index
The tree within an index is simply the file paths and sha1
"""
was_index = self.read(was_index_path)
is_index = self.generate()
added = []
modified = []
deleted = set(is_index.keys())
for is_path in is_index:
is_sha1, _ = is_index[is_path]
deleted.discard(is_path)
if is_path not in was_index:
added.append(is_path)
elif is_sha1 != was_index[is_path][0]:
modified.append((is_path, is_sha1, was_index[is_path][0]))
is_same = (len(added) + len(modified) + len(deleted)) == 0
return is_same, added, modified, list(deleted)
if __name__ == "__main__":
dirpath = Path("c:/temp")
ignore = [
".ipynb_checkpoints/",
".git/",
".git",
"__pycache__/",
".venv/"
]
index = IntegrityIndex(dirpath, ignore_patterns=ignore)
# initial index state
Path("c:/temp/modified123.txt").write_text("HELLO WORLD")
Path("c:/temp/deleted123.txt").write_text("HELLO WORLD")
# save index to 'c:/temp/.fscache.index'
index.update()
# modify the index
Path("c:/temp/added123.txt").write_text("HELLO WORLD")
Path("c:/temp/modified123.txt").write_text("HELLO WORLD 2")
Path("c:/temp/deleted123.txt").unlink()
# see the diff
index, added, modified, deleted = index.diff()
print("Added\--------")
for a in added:
print(f"- {a}")
print("")
print("Modified\--------")
for m in added:
print(f"- {m}")
print("")
print("Deleted\--------")
for d in added:
print(f"- {d}")
print("")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment