-
-
Save datavudeja/a3fc97e8528b50d8d82865924b3ca877 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import hashlib | |
import fnmatch | |
from pathlib import Path | |
from collections import namedtuple | |
def calculate_sha1(path, chunksize=8192): | |
"""Calculate sha1 hexdigest of file | |
Reads file in 'chunksize' chunks (for efficiency) to calculate a sha1 hash | |
of a file. | |
Parameters: | |
- path (str or Path): The path to the file whose SHA1 hash is to be calculated. | |
- chunksize (int, optional): The size, in bytes, of each chunk read from the file. | |
[default: 8192 bytes]. | |
Returns: | |
sha1 hexdigest of the files provided at the path | |
""" | |
path = Path(path) | |
assert path.is_file() | |
sha1 = hashlib.sha1() | |
with path.open('rb') as f: | |
while chunk := f.read(chunksize): | |
sha1.update(chunk) | |
return sha1.hexdigest() | |
Fingerprint = namedtuple("Fingerprint", "type mode mtime ctime uid gid ino size") | |
def file_fingerprint(path): | |
"""Fingerprint file on disk | |
Produces a unique fingerprint for a file on disk. Used | |
to determine if a file has changed since last fingerprinted. | |
A fingerprint is used to determine if a file has possibly changed since | |
it was last inspected. Typically this is done for efficiently | |
purposes to determine if a more expensive operation (generally hashing) is | |
required to determine the file's current state on disk. | |
Parameters: | |
- path (str or Path): THe path to the file to be fingerprinted | |
Returns: | |
Fingerprint tuple of file | |
""" | |
path = Path(path) | |
assert path.is_file() | |
stat = path.stat() | |
return Fingerprint( | |
"link" if os.path.islink(path) else "file", | |
stat.st_mode & 0o111, # Excecute only | |
stat.st_mtime, | |
stat.st_ctime, | |
stat.st_uid, | |
stat.st_gid, | |
stat.st_ino, | |
stat.st_size | |
) | |
def match_gitignore_pattern(path, patterns): | |
"""Check if a given path matches any of the ignore patterns.""" | |
for pattern in patterns: | |
if fnmatch.fnmatch(path, pattern): | |
return True | |
return False | |
def is_ignored_directory(path, patterns): | |
"""Check if directory matches any of the ignore patterns | |
A directory is ignore if its name matches a pattern or if | |
the full path matches a pattern. The name and and path must be | |
prepended with a "/" for the patterns to match | |
Returns: | |
True if path matches any pattern else False | |
""" | |
return ( | |
match_gitignore_pattern(f"{path.name}/", patterns) | |
or match_gitignore_pattern(f"{path}/", patterns) | |
) | |
def filter_files(root_path, patterns): | |
"""Filter files based on .gitignore patterns. | |
Yields file paths relative to the root_path that do not match any of the | |
pass in ignore patterns. Currently does not support "!" logic | |
""" | |
root_path = Path(root_path) | |
for dirpath, dirnames, filenames in os.walk(root_path): | |
root = Path(dirpath).relative_to(root_path) | |
# prevent traversal of any ignored directories | |
dirnames[:] = [d | |
for d in dirnames | |
if not is_ignored_directory(root / d, patterns)] | |
for filename in filenames: | |
if not match_gitignore_pattern(filename, patterns): | |
yield root / filename | |
class IntegrityIndex: | |
"""Filesystem Integrity Index | |
Records an index to keep track of a working directory contents. The index | |
works much like a git index which can determine differences between the working | |
directory and the last commit. Indended to be used to ensure a working directory | |
contents matches exactly a saved index. | |
By default the index files is stored in the root of the worktree as ".fscache.index". The | |
index path can be provided to override this in cases such as storing an index in a git repo | |
to ensure some software is installed properly when running a build. | |
Additionally a .gitignore like list of patterns can be provide to filter files to not be | |
added to an index. | |
Main methods provided are update and diff. | |
""" | |
MAGIC = "INTIDX" | |
VERSION = "1" | |
def __init__(self, worktree_path, index_path=None, ignore_patterns=None): | |
self.worktree_path = Path(worktree_path) | |
if index_path is None: | |
self.index_path = self.worktree_path / ".fscache.index" | |
else: | |
self.index_path = Path(index_path) | |
self.ignore_patterns = ignore_patterns or [] | |
self.ignore_patterns.append(".fscache.index") | |
def generate(self): | |
index, *_ = self.diff() | |
return index | |
def update(self): | |
index = self.generate() | |
self.write(index) | |
def diff(self): | |
"""Diff an index""" | |
index = self.read() | |
added = [] | |
modified = [] | |
deleted = set(index.keys()) | |
for path in filter_files(self.worktree_path, self.ignore_patterns): | |
deleted.discard(path) | |
is_fingerprint = file_fingerprint(path) | |
# continue on if fingerprint matches | |
if path in index: | |
was_sha1, was_fingerprint = index[path] | |
# fingerprint matches we can assume file is not changed | |
if is_fingerprint == was_fingerprint: | |
print("SAME", path) | |
continue | |
# Fingerprint is different. Need to recalc hash | |
is_sha1 = calculate_sha1(path) | |
if was_sha1 != is_sha1: | |
modified.append(path) | |
else: | |
added.append(path) | |
is_sha1 = calculate_sha1(self.worktree_path / path) | |
index[path] = (is_sha1, is_fingerprint) | |
# remove from the index anything that is no longer there | |
index = {k:v for k,v in index.items() if k not in deleted} | |
return index, added, modified, list(deleted) | |
def write(self, index): | |
print(index) | |
lines = [self.MAGIC, self.VERSION] | |
for path in index: | |
sha1, fp = index[path] | |
fields = [ | |
fp.type, | |
f"{fp.mode:03o}", | |
f"{fp.mtime:>20}", | |
f"{fp.ctime:>20}", | |
fp.uid, | |
fp.gid, | |
fp.ino, | |
f"{fp.size:<12}", | |
sha1, | |
path | |
] | |
lines.append(" ".join(str(field) for field in fields)) | |
self.index_path.write_text("\n".join(lines)) | |
def read(self, index_path=None): | |
if index_path is None: | |
index_path = self.index_path | |
else: | |
index_path = Path(index_path) | |
if not index_path.exists(): | |
return {} | |
magic, version, *lines = index_path.read_text().splitlines() | |
assert magic == self.MAGIC | |
assert version == self.VERSION # only support current version | |
index = {} | |
for line in lines: | |
_type, mode, mtime, ctime, uid, gid, ino, size, sha1, pathstr = line.split(maxsplit=9) | |
fingerprint = Fingerprint( | |
_type, | |
int(mode, 8), | |
float(mtime), | |
float(ctime), | |
int(uid), | |
int(gid), | |
int(ino), | |
int(size) | |
) | |
index[Path(pathstr)] = (sha1, fingerprint) | |
return index | |
def verify_tree(self, was_index_path): | |
"""Compares the trees within an index | |
The tree within an index is simply the file paths and sha1 | |
""" | |
was_index = self.read(was_index_path) | |
is_index = self.generate() | |
added = [] | |
modified = [] | |
deleted = set(is_index.keys()) | |
for is_path in is_index: | |
is_sha1, _ = is_index[is_path] | |
deleted.discard(is_path) | |
if is_path not in was_index: | |
added.append(is_path) | |
elif is_sha1 != was_index[is_path][0]: | |
modified.append((is_path, is_sha1, was_index[is_path][0])) | |
is_same = (len(added) + len(modified) + len(deleted)) == 0 | |
return is_same, added, modified, list(deleted) | |
if __name__ == "__main__": | |
dirpath = Path("c:/temp") | |
ignore = [ | |
".ipynb_checkpoints/", | |
".git/", | |
".git", | |
"__pycache__/", | |
".venv/" | |
] | |
index = IntegrityIndex(dirpath, ignore_patterns=ignore) | |
# initial index state | |
Path("c:/temp/modified123.txt").write_text("HELLO WORLD") | |
Path("c:/temp/deleted123.txt").write_text("HELLO WORLD") | |
# save index to 'c:/temp/.fscache.index' | |
index.update() | |
# modify the index | |
Path("c:/temp/added123.txt").write_text("HELLO WORLD") | |
Path("c:/temp/modified123.txt").write_text("HELLO WORLD 2") | |
Path("c:/temp/deleted123.txt").unlink() | |
# see the diff | |
index, added, modified, deleted = index.diff() | |
print("Added\--------") | |
for a in added: | |
print(f"- {a}") | |
print("") | |
print("Modified\--------") | |
for m in added: | |
print(f"- {m}") | |
print("") | |
print("Deleted\--------") | |
for d in added: | |
print(f"- {d}") | |
print("") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment