Created
May 26, 2015 22:48
-
-
Save pblocz/fc6e75f27157bad4d881 to your computer and use it in GitHub Desktop.
Script to do some housekeeping of big or sensitive files and erase from repository history
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
''' | |
git-big-files | (c) 2015 Pablo Cabeza | |
license: [modified BSD](http://opensource.org/licenses/BSD-3-Clause) | |
Script to do some housekeeping of big or sensitive files and erase | |
from repository history | |
''' | |
# Standard python libraries | |
import os | |
import sys | |
import glob | |
import shlex | |
import logging | |
import argparse | |
import subprocess | |
from collections import namedtuple | |
# Custom installed libraries | |
from tabulate import tabulate # pip tabulate | |
from hurry.filesize import size as hsize # pip hurry.filesize | |
logger = logging.getLogger(__name__) # Get current app logger | |
class Git(object): | |
"Represents a git repository and some functionality" | |
Entry = namedtuple("Entry", ["size", "pack", "sha", "path"]) | |
"An entry from git pack with extra info, as returned by this class methods" | |
@classmethod | |
def _build_call(cls, cmd): return ' '.join(shlex.quote(a) for a in cmd) | |
@classmethod | |
def _unquoted_call(cls, cmd, retcode=False, stdout=False, shell=True): | |
''' | |
Arguments | |
--------- | |
- `cmd`: command to execute passed to Popen | |
- `stdout`: whether print to stdout or not | |
- `retcode`: return call return code | |
- `shell`: whether to execute through shell or not | |
''' | |
kwargs = dict(l for l in [ | |
(('stdout', subprocess.PIPE,) if stdout is False else tuple()), | |
('shell', shell), | |
] if l) | |
pid = subprocess.Popen(cmd, **kwargs) | |
out, err = pid.communicate() | |
out = out.decode('utf-8') if out is not None else None | |
return (pid.returncode, out,) if retcode else out | |
@classmethod | |
def _call(cls, cmd, *args, **kwargs): | |
"" | |
return cls._unquoted_call(cls._build_call(cmd), *args, **kwargs) | |
def __init__(self): | |
self.cwd = os.getcwd() | |
rt, root = self._unquoted_call(['git', 'rev-parse', '--show-toplevel'], | |
shell=False, retcode=True) | |
self.root = root.strip() | |
if rt != 0: raise FileNotFoundError("not a git directory") | |
elif self.root != self.cwd: | |
raise FileNotFoundError("not a git root, try from %s" % os.path.relpath(self.root)) | |
def get_objects(self): | |
"List git objects from the pack with their sizes" | |
cmd = "%s %s" % (self._build_call(["git", "verify-pack", "-v"]), | |
".git/objects/pack/pack-*.idx") | |
out = self._unquoted_call(cmd) | |
raw = out.splitlines() | |
lines = [l for l in raw if "chain" not in l][:-2] # strip chain lines plus metadata lines | |
return [l.split() for l in lines] | |
def get_sha_dict(self): | |
"Get dict form files the pack as (file-sha: path)" | |
out = self._call(["git", "rev-list", "--all", "--objects"]) | |
rows = [l.split(maxsplit=1) for l in out.splitlines()] | |
return dict(tuple(r) for r in rows if len(r) == 2) | |
def get_files_by_size(self, maxlimit=10): | |
obj = sorted(self.get_objects(), key=lambda l: int(l[2]), reverse=True) | |
obj = obj[:maxlimit] | |
sha = self.get_sha_dict() | |
out = [] | |
for o in obj: | |
e = self.Entry( | |
size=hsize(int(o[2])), | |
pack=hsize(int(o[3])), | |
sha=o[0], | |
path=sha.get(o[0], None), | |
) | |
if e.path is not None: out.append(e) | |
return out | |
def filter_file(self, file, force=False): | |
files = glob.glob(file) or [file] | |
print(files) | |
for f in files: | |
print(f) | |
cmd = ["git", "filter-branch"] + (["--force"] if force else []) +\ | |
["--prune-empty", "--index-filter", | |
'git rm -rf --cached --ignore-unmatch "%s"' % file, | |
"--tag-name-filter", "cat", "--", "--all"] | |
self._call(cmd, stdout=True) | |
def clear_untracked_history(self): | |
cmds = [ | |
"git for-each-ref --format='delete %(refname)' refs/original | git update-ref --stdin", | |
['git', 'reflog', 'expire', '--expire=now', '--all'], | |
["git", "gc", "--prune=now"], | |
] | |
for cmd in cmds: | |
if type(cmd) == str: rt, out = self._unquoted_call(cmd, retcode=True) | |
else: rt, out = self._call(cmd, retcode=True) | |
if rt != 0: raise subprocess.CalledProcessError(rt, cmd) | |
def _parse_args(arguments): | |
parser = argparse.ArgumentParser(description='') | |
parser.add_argument('--version', '-v', type=bool, | |
help='show version of the program') | |
parser.add_argument('--max', '-m', type=int, nargs='?', default=10, | |
help="maximum number of files") | |
subparsers = parser.add_subparsers(dest='cmd') | |
subparsers.required = False | |
delete_parser = subparsers.add_parser('delete', | |
description='filter out a file from history') | |
delete_parser.add_argument('--force', '-f', action="store_true", default=False, | |
help='force backup overwrite') | |
delete_parser.add_argument('file', help="complete path of file to delete or glob") | |
clean_parser = subparsers.add_parser('clean', | |
description='cleans untracked files from history') | |
clear_parser = subparsers.add_parser('clear', | |
description='filter out a file from history and clean') | |
clear_parser.add_argument('--force', '-f', action="store_true", default=False, | |
help='force backup overwrite') | |
clear_parser.add_argument('file', help="complete path of file to clear or glob") | |
return parser.parse_args(arguments) | |
def main(arguments=None): | |
''' | |
Main function of the script, use as: | |
git big-files [nfiles] | |
git big-files delete [file] | |
git big-files clean | |
git big-files clear [file] | |
`argmuntes`: list of arguments to execute main. if `None` then | |
sys.argv will be used | |
**Return**: the return code as an `int` | |
''' | |
arguments = arguments or sys.argv[1:] | |
args = _parse_args(arguments) | |
# Configure logger | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s") | |
# Process input from arguments | |
repo = Git() | |
if args.cmd == "delete": repo.filter_file(args.file, force=args.force) | |
elif args.cmd == 'clean': repo.clear_untracked_history() | |
elif args.cmd == 'clear': | |
repo.filter_file(args.file, force=args.force) | |
repo.clear_untracked_history() | |
else: | |
ob = repo.get_files_by_size(maxlimit=args.max) | |
print(tabulate(ob, headers=Git.Entry._fields)) | |
return 0 | |
if __name__ == "__main__": sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment