Created
August 25, 2017 17:50
-
-
Save andygock/9610aff6882b26cbd860dbfa4a0af9cc to your computer and use it in GitHub Desktop.
Wrapper script to compress multiple hard linked files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Wrapper script to compress multiple hard linked files. | |
Hard links are preserved after compression. | |
It does this by removing all but one of the links, compressing the data under the remaining name, | |
then respectively linking again to the newly compressed file (but with added suffix e.g ".bz2") | |
Compatible with files which do not have hard links too. | |
Works with most compressors which use the gzip style syntax. | |
Compatible with Python 2.7+ and Python 3.x only. | |
usage: compress-linked.py [-h] [-v] [--dry] [--compressor TYPE] [--level N] | |
[--search [PATH [PATH ...]]] | |
FILE | |
positional arguments: | |
FILE | |
optional arguments: | |
-h, --help show this help message and exit | |
-v, --verbose verbose flag is sent to compressor | |
--dry perform dry run | |
--compressor TYPE use custom compressor e.g gzip, bzip2, xz (default: | |
lbzip2) | |
--level N compression level (default: 6) | |
--search [PATH [PATH ...]] | |
search path(s) to look for hard linked files (default: | |
.) | |
""" | |
from __future__ import print_function | |
import os | |
import subprocess | |
import sys | |
import argparse | |
class bcolors: | |
BLUE = '\033[94m' | |
GREEN = '\033[92m' | |
RED = '\033[91m' | |
YELLOW = '\033[93m' | |
BOLD = '\033[1m' | |
UNDERLINE = '\033[4m' | |
ENDC = '\033[0m' | |
class Compress: | |
def __init__(self, filename, args=[]): | |
# check if file exists (we check for write access as we need to delete this file later) | |
if not os.access(filename, os.W_OK): | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " File '{}' does not exist or is not writable/removable.".format(filename), file=sys.stderr) | |
sys.exit(1) | |
# get number of hard links | |
s = os.stat(filename) | |
self.link_count = s.st_nlink | |
self.inode = s.st_ino | |
self.filename = filename | |
self.dry = args.dry | |
self.search_path = args.search | |
self.compressor = [args.compressor, "-c"] | |
self.compressor.append("-" + args.level) | |
if args.verbose: | |
self.compressor.append("-v") | |
# determine extension from selected compressor | |
if self.compressor[0] in ["bzip2", "lbzip2", "pbzip2"]: | |
self.extension = ".bz2" | |
elif self.compressor[0] in ["gzip", "pigz"]: | |
self.extension = ".gz" | |
elif self.compressor[0] in ["xz", "pxz"]: | |
self.extension = ".xz" | |
elif self.compressor[0] in ["lzip", "plzip"]: | |
self.extension = ".lz" | |
elif self.compressor[0] in ["rzip"]: | |
self.extension = ".rz" | |
elif self.compressor[0] in ["lrzip"]: | |
self.extension = ".lrz" | |
else: | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " Compressor '{}' not supported.".format(args.compressor), file=sys.stderr) | |
sys.exit(1) | |
# check validity of level | |
if not (int(args.level) >= 1 and int(args.level) <= 9): | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " Compression level is not valid. Use integer value between 1 and 9.", file=sys.stderr) | |
sys.exit(1) | |
self.find_links() | |
self.check_files() | |
self.compress() | |
def find_links(self): | |
print(bcolors.YELLOW + "SEARCH:" + bcolors.ENDC + " Hard links to '" + bcolors.YELLOW + "{}".format(self.filename) + bcolors.ENDC + "'. Expecting to find {} for inode {}.".format(self.link_count, self.inode)) | |
if self.link_count == 1: | |
# only 1 link, we don't need to search for any more | |
self.links = [self.filename] | |
else: | |
# search for other files with the same inode (by using find -samefile) | |
command = ["find"] + self.search_path | |
command += ["-samefile", self.filename, "-print0"] | |
try: | |
output = subprocess.check_output(command) | |
except KeyboardInterrupt: | |
print(bcolors.RED + "QUIT:" + bcolors.ENDC + " Interrupted by user.", file=sys.stderr) | |
sys.exit(1) | |
# ignore last element after splitting on null char | |
self.links = [f.decode('ascii') for f in output.split(b"\0")[:-1]] | |
# show user what files were found | |
for f in self.links: | |
print(bcolors.GREEN + "FOUND:" + bcolors.ENDC + " '{}'".format(f)) | |
# check if we found all of them | |
if len(self.links) != self.link_count: | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " Counted mismatch of hard links. Quitting.", file=sys.stderr) | |
sys.exit(1) | |
def check_files(self): | |
errors = 0 | |
for f in self.links: | |
# check if original file with multiple links exists | |
if not os.access(f, os.W_OK): | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " File '{}' does not exist or is not writable/removable.".format(f), file=sys.stderr) | |
errors += 1 | |
# check if target filename exists | |
if os.access(f + self.extension, os.R_OK): | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " File '{}' already exists.".format(f + self.extension), file=sys.stderr) | |
errors += 1 | |
if errors: | |
print("Quitting", file=sys.stderr) | |
sys.exit(1) | |
def compress(self): | |
if len(self.links) == 0: | |
# shold not reach here | |
sys.exit(1) | |
# compress the first file, any one of the files would do really | |
first_file = self.links[0] | |
target_file = first_file + self.extension | |
if not self.dry: | |
with open(target_file, "w") as outfile: | |
print(bcolors.GREEN + "COMPRESS:" + bcolors.ENDC + " '{}'' -> '{}'".format(first_file, target_file)) | |
try: | |
return_code = subprocess.call(self.compressor + [first_file], stdout=outfile) | |
except KeyboardInterrupt: | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " Compression interrupted. Quitting.", file=sys.stderr) | |
print(bcolors.GREEN + "DELETE:" + bcolors.ENDC + " '{}'".format(target_file), file=sys.stderr) | |
os.remove(target_file) | |
sys.exit(1) | |
if return_code != 0: | |
print(bcolors.RED + "ERROR:" + bcolors.ENDC + " Compression failed. Quitting.", file=sys.stderr) | |
print(bcolors.GREEN + "DELETE:" + bcolors.ENDC + " '{}'".format(target_file), file=sys.stderr) | |
os.remove(target_file) | |
sys.exit(1) | |
else: | |
# dry run | |
print(bcolors.GREEN + "COMPRESS:" + bcolors.ENDC + " '{}'' -> '{}' ({})".format(first_file, target_file, " ".join(self.compressor))) | |
# create hard links to the compressed file | |
for f in self.links: | |
if f != first_file: | |
print(bcolors.GREEN + "LINK:" + bcolors.ENDC + " '{}' -> '{}'".format(f + self.extension, first_file + self.extension)) | |
if not self.dry: | |
os.link(first_file + self.extension, f + self.extension) | |
# delete originals | |
for f in self.links: | |
print(bcolors.GREEN + "DELETE:" + bcolors.ENDC + " '{}'".format(f)) | |
if not self.dry: | |
os.remove(f) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-v", "--verbose", action="store_true", help="verbose flag is sent to compressor") | |
parser.add_argument("--dry", action="store_true", help="perform dry run") | |
parser.add_argument("--compressor", default="lbzip2", metavar="TYPE", help="use custom compressor e.g gzip, bzip2, xz (default: lbzip2)") | |
parser.add_argument("--level", default="6", metavar="N", help="compression level (default: 6)") | |
parser.add_argument("--search", nargs="*", default=["."], metavar="PATH", help="search path(s) to look for hard linked files (default: .)") | |
parser.add_argument("file", nargs=1, metavar="FILE") | |
args = parser.parse_args() | |
# print(args) | |
# sys.exit(0) | |
f = Compress(args.file[0], args) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment