Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from kbg/scandir.py
Created October 30, 2025 20:19
Show Gist options
  • Save datavudeja/63401f20f7af85817c204406db2e2399 to your computer and use it in GitHub Desktop.
Save datavudeja/63401f20f7af85817c204406db2e2399 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Copyright (c) 2020 Kolja Glogowski
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import stat
import zlib
import hashlib
import logging
import csv
import re
import argparse
__version__ = '0.3.0'
logger = logging.getLogger(__name__)
class Crc32Hash:
name = 'crc32'
digest_size = 4
def __init__(self, data=b''):
self._digest = 0
self.update(data)
def update(self, data):
self._digest = zlib.crc32(data, self._digest)
def hexdigest(self):
return f'{self._digest:08x}'
class FileChecksumCreator:
supported_checksums = {
'crc32': Crc32Hash,
'md5': hashlib.md5,
'sha1': hashlib.sha1,
'sha256': hashlib.sha256,
'sha384': hashlib.sha384,
'sha512': hashlib.sha512
}
def __init__(self, algorithms, chunksize=65536):
self.chunksize = chunksize
self._checksums = {}
for name in map(str.lower, algorithms):
self._checksums[name] = self.supported_checksums[name]
def hexdigest(self, fpath):
hobjs = tuple(v() for v in self._checksums.values())
with open(fpath, 'rb') as f:
while True:
chunk = f.read(self.chunksize)
if len(chunk) == 0:
break
for h in hobjs:
h.update(chunk)
return dict(zip(
self._checksums.keys(),
(h.hexdigest() for h in hobjs)))
def recursive_scandir(path, depth=0, subdir_filter=None):
try:
with os.scandir(path) as it:
for entry in it:
yield depth, entry, None
if entry.is_dir(follow_symlinks=False):
if subdir_filter and not subdir_filter(depth, entry):
continue
for subdir_depth, subdir_entry, subdir_error in (
recursive_scandir(
entry.path, (depth + 1), subdir_filter)):
yield subdir_depth, subdir_entry, subdir_error
except OSError as error:
yield depth, None, error
def scan_directory(path, max_depth=None, exclude_regex=None,
scandir_error='warn'):
def create_entry_info(path, depth, st):
blocks = st.st_blocks if hasattr(st, 'st_blocks') else None
return dict((
('path', path),
('depth', depth),
('mode', stat.filemode(st.st_mode)),
('mask', stat.S_IMODE(st.st_mode)),
('mtime', st.st_mtime_ns),
('size', st.st_size),
('blocks', blocks)))
def depth_subdir_filter(depth, entry):
return depth < max_depth
def relpath_subdir_filter(depth, entry):
relpath = os.path.relpath(entry.path, path)
return exclude_regex.match(relpath) is None
def depth_relpath_subdir_filter(depth, entry):
return (depth_subdir_filter(depth, entry) and
relpath_subdir_filter(depth, entry))
if max_depth is not None and exclude_regex is None:
subdir_filter = depth_subdir_filter
elif max_depth is None and exclude_regex is not None:
subdir_filter = relpath_subdir_filter
elif max_depth is not None and exclude_regex is not None:
subdir_filter = depth_relpath_subdir_filter
else:
subdir_filter = None
if not os.path.isdir(path):
raise NotADirectoryError(f'Not a directory: {path!r}')
d = create_entry_info(path='.', depth=0, st=os.stat(path))
yield d
for depth, entry, error in recursive_scandir(
path, depth=1, subdir_filter=subdir_filter):
if error is None:
relpath = os.path.relpath(entry.path, path)
if exclude_regex is not None:
if exclude_regex.match(relpath):
continue
d = create_entry_info(
path=relpath, depth=depth,
st=entry.stat(follow_symlinks=False))
if entry.is_symlink():
d['target'] = os.readlink(entry.path)
elif entry.is_file(follow_symlinks=False):
d['_isreg'] = True
yield d
else:
if scandir_error == 'warn':
logger.warning(f'Directory scan failed: {error}')
elif scandir_error == 'ignore':
pass
elif scandir_error == 'raise':
raise error
else:
raise ValueError(
f'Unknown exception handler: {scandir_error!r}')
class Application:
default_csv_fieldnames = [
'depth', 'mode', 'mask', 'mtime', 'size', 'blocks',
'path', 'target'
]
def __init__(self,
csv_dialect='excel',
checksum_algorithms=None,
checksum_chunksize=1048576,
checksum_njobs=1,
max_depth=None,
exclude_pattern=None,
scandir_error='warn',
checksum_error='warn'):
if csv_dialect not in csv.list_dialects():
raise ValueError(f'Unknown CSV dialect: {csv_dialect}')
self.csv_dialect = csv_dialect
self.csv_fieldnames = self.default_csv_fieldnames.copy()
if checksum_algorithms is not None:
self.checksum_creator = FileChecksumCreator(
checksum_algorithms, chunksize=checksum_chunksize)
self.checksum_njobs = int(checksum_njobs)
self.csv_fieldnames += checksum_algorithms
else:
self.checksum_creator = None
self.max_depth = max_depth
if exclude_pattern is not None:
try:
self.exclude_regex = re.compile(exclude_pattern)
except re.error as e:
raise ValueError(f'Invalid exclude pattern: {e}')
else:
self.exclude_regex = None
for name, value in [
('scandir_error', scandir_error),
('checksum_error', checksum_error)]:
if value not in ['warn', 'ignore', 'raise']:
raise ValueError(f'Invalid {name} value: {value!r}')
self.scandir_error = scandir_error
self.checksum_error = checksum_error
def _iter_entries_sequential(self, path):
for d in scan_directory(
path,
max_depth=self.max_depth,
exclude_regex=self.exclude_regex,
scandir_error=self.scandir_error):
if self.checksum_creator and d.get('_isreg'):
try:
checksums = self.checksum_creator.hexdigest(
os.path.join(path, d['path']))
d.update(checksums)
except IOError as error:
if self.checksum_error == 'warn':
logger.warning(f'Cannot compute checksum: {error}')
elif self.checksum_error == 'ignore':
pass
else:
raise
yield d
def _iter_entries_parallel(self, path):
pass
def run(self, path, out=None):
if out is None:
out = sys.stdout
writer = csv.DictWriter(
out,
fieldnames=self.csv_fieldnames,
dialect=self.csv_dialect,
extrasaction='ignore')
writer.writeheader()
for d in self._iter_entries_sequential(path):
writer.writerow(d)
def main():
parser = argparse.ArgumentParser(
description="""Collect information (e.g. sizes, permissions,
modification time) about the contents of a directory, and
(optionally) compute checksums for regular files.""")
parser.add_argument(
'-c', dest='checksums',
help='checksum algorithm(s) (default: None)')
parser.add_argument(
'-s', dest='chunksize', type=int, default=1048576,
help='chunksize in bytes for checksum computation (default: 1048576)')
parser.add_argument(
'-o', dest='outfile',
help='write results to this file instead of printing to stdout')
parser.add_argument(
'-t', dest='outfmt', choices=['csv', 'tsv'], default='csv',
help='output format (default: csv)')
parser.add_argument(
'-q', '--quiet', action='store_true',
help='suppress warnings')
parser.add_argument(
'--max-depth', metavar='DEPTH', type=int,
help='maximum directory scan depth')
parser.add_argument(
'--exclude', metavar='PATTERN',
help='regex matched against relative path names')
parser.add_argument(
'--list-checksums', action='version',
version='Available checksum algorithms: {}'.format(', '.join(
list(FileChecksumCreator.supported_checksums))),
help='list available checksum algorithms and exit')
parser.add_argument(
'--version', action='version',
version='%(prog)s {}'.format(__version__))
parser.add_argument(
dest='path', metavar='DIR', nargs=1,
help='Directory to be scanned')
args = parser.parse_args()
if args.outfmt == 'csv':
csv_dialect = 'excel'
elif args.outfmt == 'tsv':
csv_dialect = 'excel-tab'
else:
raise RuntimeError('Unknown output format')
if args.checksums is not None:
checksum_algorithms = re.split(r'[\s,]+', args.checksums)
else:
checksum_algorithms = None
if args.quiet:
error_method = 'ignore'
else:
error_method = 'warn'
app = Application(
csv_dialect=csv_dialect,
checksum_algorithms=checksum_algorithms,
checksum_chunksize=args.chunksize,
checksum_njobs=1,
max_depth=args.max_depth,
exclude_pattern=args.exclude,
scandir_error=error_method,
checksum_error=error_method
)
if args.outfile is None:
app.run(args.path[0])
else:
with open(args.outfile, 'w', newline='', encoding='utf-8') as f:
app.run(args.path[0], out=f)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment