- 
      
- 
        Save datavudeja/63401f20f7af85817c204406db2e2399 to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| # Copyright (c) 2020 Kolja Glogowski | |
| # | |
| # Permission is hereby granted, free of charge, to any person | |
| # obtaining a copy of this software and associated documentation | |
| # files (the "Software"), to deal in the Software without | |
| # restriction, including without limitation the rights to use, | |
| # copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the | |
| # Software is furnished to do so, subject to the following | |
| # conditions: | |
| # | |
| # The above copyright notice and this permission notice shall be | |
| # included in all copies or substantial portions of the Software. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
| # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | |
| # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
| # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | |
| # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
| # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | |
| # OTHER DEALINGS IN THE SOFTWARE. | |
| import os | |
| import sys | |
| import stat | |
| import zlib | |
| import hashlib | |
| import logging | |
| import csv | |
| import re | |
| import argparse | |
| __version__ = '0.3.0' | |
| logger = logging.getLogger(__name__) | |
| class Crc32Hash: | |
| name = 'crc32' | |
| digest_size = 4 | |
| def __init__(self, data=b''): | |
| self._digest = 0 | |
| self.update(data) | |
| def update(self, data): | |
| self._digest = zlib.crc32(data, self._digest) | |
| def hexdigest(self): | |
| return f'{self._digest:08x}' | |
| class FileChecksumCreator: | |
| supported_checksums = { | |
| 'crc32': Crc32Hash, | |
| 'md5': hashlib.md5, | |
| 'sha1': hashlib.sha1, | |
| 'sha256': hashlib.sha256, | |
| 'sha384': hashlib.sha384, | |
| 'sha512': hashlib.sha512 | |
| } | |
| def __init__(self, algorithms, chunksize=65536): | |
| self.chunksize = chunksize | |
| self._checksums = {} | |
| for name in map(str.lower, algorithms): | |
| self._checksums[name] = self.supported_checksums[name] | |
| def hexdigest(self, fpath): | |
| hobjs = tuple(v() for v in self._checksums.values()) | |
| with open(fpath, 'rb') as f: | |
| while True: | |
| chunk = f.read(self.chunksize) | |
| if len(chunk) == 0: | |
| break | |
| for h in hobjs: | |
| h.update(chunk) | |
| return dict(zip( | |
| self._checksums.keys(), | |
| (h.hexdigest() for h in hobjs))) | |
| def recursive_scandir(path, depth=0, subdir_filter=None): | |
| try: | |
| with os.scandir(path) as it: | |
| for entry in it: | |
| yield depth, entry, None | |
| if entry.is_dir(follow_symlinks=False): | |
| if subdir_filter and not subdir_filter(depth, entry): | |
| continue | |
| for subdir_depth, subdir_entry, subdir_error in ( | |
| recursive_scandir( | |
| entry.path, (depth + 1), subdir_filter)): | |
| yield subdir_depth, subdir_entry, subdir_error | |
| except OSError as error: | |
| yield depth, None, error | |
| def scan_directory(path, max_depth=None, exclude_regex=None, | |
| scandir_error='warn'): | |
| def create_entry_info(path, depth, st): | |
| blocks = st.st_blocks if hasattr(st, 'st_blocks') else None | |
| return dict(( | |
| ('path', path), | |
| ('depth', depth), | |
| ('mode', stat.filemode(st.st_mode)), | |
| ('mask', stat.S_IMODE(st.st_mode)), | |
| ('mtime', st.st_mtime_ns), | |
| ('size', st.st_size), | |
| ('blocks', blocks))) | |
| def depth_subdir_filter(depth, entry): | |
| return depth < max_depth | |
| def relpath_subdir_filter(depth, entry): | |
| relpath = os.path.relpath(entry.path, path) | |
| return exclude_regex.match(relpath) is None | |
| def depth_relpath_subdir_filter(depth, entry): | |
| return (depth_subdir_filter(depth, entry) and | |
| relpath_subdir_filter(depth, entry)) | |
| if max_depth is not None and exclude_regex is None: | |
| subdir_filter = depth_subdir_filter | |
| elif max_depth is None and exclude_regex is not None: | |
| subdir_filter = relpath_subdir_filter | |
| elif max_depth is not None and exclude_regex is not None: | |
| subdir_filter = depth_relpath_subdir_filter | |
| else: | |
| subdir_filter = None | |
| if not os.path.isdir(path): | |
| raise NotADirectoryError(f'Not a directory: {path!r}') | |
| d = create_entry_info(path='.', depth=0, st=os.stat(path)) | |
| yield d | |
| for depth, entry, error in recursive_scandir( | |
| path, depth=1, subdir_filter=subdir_filter): | |
| if error is None: | |
| relpath = os.path.relpath(entry.path, path) | |
| if exclude_regex is not None: | |
| if exclude_regex.match(relpath): | |
| continue | |
| d = create_entry_info( | |
| path=relpath, depth=depth, | |
| st=entry.stat(follow_symlinks=False)) | |
| if entry.is_symlink(): | |
| d['target'] = os.readlink(entry.path) | |
| elif entry.is_file(follow_symlinks=False): | |
| d['_isreg'] = True | |
| yield d | |
| else: | |
| if scandir_error == 'warn': | |
| logger.warning(f'Directory scan failed: {error}') | |
| elif scandir_error == 'ignore': | |
| pass | |
| elif scandir_error == 'raise': | |
| raise error | |
| else: | |
| raise ValueError( | |
| f'Unknown exception handler: {scandir_error!r}') | |
| class Application: | |
| default_csv_fieldnames = [ | |
| 'depth', 'mode', 'mask', 'mtime', 'size', 'blocks', | |
| 'path', 'target' | |
| ] | |
| def __init__(self, | |
| csv_dialect='excel', | |
| checksum_algorithms=None, | |
| checksum_chunksize=1048576, | |
| checksum_njobs=1, | |
| max_depth=None, | |
| exclude_pattern=None, | |
| scandir_error='warn', | |
| checksum_error='warn'): | |
| if csv_dialect not in csv.list_dialects(): | |
| raise ValueError(f'Unknown CSV dialect: {csv_dialect}') | |
| self.csv_dialect = csv_dialect | |
| self.csv_fieldnames = self.default_csv_fieldnames.copy() | |
| if checksum_algorithms is not None: | |
| self.checksum_creator = FileChecksumCreator( | |
| checksum_algorithms, chunksize=checksum_chunksize) | |
| self.checksum_njobs = int(checksum_njobs) | |
| self.csv_fieldnames += checksum_algorithms | |
| else: | |
| self.checksum_creator = None | |
| self.max_depth = max_depth | |
| if exclude_pattern is not None: | |
| try: | |
| self.exclude_regex = re.compile(exclude_pattern) | |
| except re.error as e: | |
| raise ValueError(f'Invalid exclude pattern: {e}') | |
| else: | |
| self.exclude_regex = None | |
| for name, value in [ | |
| ('scandir_error', scandir_error), | |
| ('checksum_error', checksum_error)]: | |
| if value not in ['warn', 'ignore', 'raise']: | |
| raise ValueError(f'Invalid {name} value: {value!r}') | |
| self.scandir_error = scandir_error | |
| self.checksum_error = checksum_error | |
| def _iter_entries_sequential(self, path): | |
| for d in scan_directory( | |
| path, | |
| max_depth=self.max_depth, | |
| exclude_regex=self.exclude_regex, | |
| scandir_error=self.scandir_error): | |
| if self.checksum_creator and d.get('_isreg'): | |
| try: | |
| checksums = self.checksum_creator.hexdigest( | |
| os.path.join(path, d['path'])) | |
| d.update(checksums) | |
| except IOError as error: | |
| if self.checksum_error == 'warn': | |
| logger.warning(f'Cannot compute checksum: {error}') | |
| elif self.checksum_error == 'ignore': | |
| pass | |
| else: | |
| raise | |
| yield d | |
| def _iter_entries_parallel(self, path): | |
| pass | |
| def run(self, path, out=None): | |
| if out is None: | |
| out = sys.stdout | |
| writer = csv.DictWriter( | |
| out, | |
| fieldnames=self.csv_fieldnames, | |
| dialect=self.csv_dialect, | |
| extrasaction='ignore') | |
| writer.writeheader() | |
| for d in self._iter_entries_sequential(path): | |
| writer.writerow(d) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="""Collect information (e.g. sizes, permissions, | |
| modification time) about the contents of a directory, and | |
| (optionally) compute checksums for regular files.""") | |
| parser.add_argument( | |
| '-c', dest='checksums', | |
| help='checksum algorithm(s) (default: None)') | |
| parser.add_argument( | |
| '-s', dest='chunksize', type=int, default=1048576, | |
| help='chunksize in bytes for checksum computation (default: 1048576)') | |
| parser.add_argument( | |
| '-o', dest='outfile', | |
| help='write results to this file instead of printing to stdout') | |
| parser.add_argument( | |
| '-t', dest='outfmt', choices=['csv', 'tsv'], default='csv', | |
| help='output format (default: csv)') | |
| parser.add_argument( | |
| '-q', '--quiet', action='store_true', | |
| help='suppress warnings') | |
| parser.add_argument( | |
| '--max-depth', metavar='DEPTH', type=int, | |
| help='maximum directory scan depth') | |
| parser.add_argument( | |
| '--exclude', metavar='PATTERN', | |
| help='regex matched against relative path names') | |
| parser.add_argument( | |
| '--list-checksums', action='version', | |
| version='Available checksum algorithms: {}'.format(', '.join( | |
| list(FileChecksumCreator.supported_checksums))), | |
| help='list available checksum algorithms and exit') | |
| parser.add_argument( | |
| '--version', action='version', | |
| version='%(prog)s {}'.format(__version__)) | |
| parser.add_argument( | |
| dest='path', metavar='DIR', nargs=1, | |
| help='Directory to be scanned') | |
| args = parser.parse_args() | |
| if args.outfmt == 'csv': | |
| csv_dialect = 'excel' | |
| elif args.outfmt == 'tsv': | |
| csv_dialect = 'excel-tab' | |
| else: | |
| raise RuntimeError('Unknown output format') | |
| if args.checksums is not None: | |
| checksum_algorithms = re.split(r'[\s,]+', args.checksums) | |
| else: | |
| checksum_algorithms = None | |
| if args.quiet: | |
| error_method = 'ignore' | |
| else: | |
| error_method = 'warn' | |
| app = Application( | |
| csv_dialect=csv_dialect, | |
| checksum_algorithms=checksum_algorithms, | |
| checksum_chunksize=args.chunksize, | |
| checksum_njobs=1, | |
| max_depth=args.max_depth, | |
| exclude_pattern=args.exclude, | |
| scandir_error=error_method, | |
| checksum_error=error_method | |
| ) | |
| if args.outfile is None: | |
| app.run(args.path[0]) | |
| else: | |
| with open(args.outfile, 'w', newline='', encoding='utf-8') as f: | |
| app.run(args.path[0], out=f) | |
| if __name__ == '__main__': | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| pass | |
| except BrokenPipeError: | |
| pass | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment