Skip to content

Instantly share code, notes, and snippets.

@aoirint
Created May 17, 2018 15:30
Show Gist options
  • Save aoirint/41746635e0dde830d44a6414d2ef5bb3 to your computer and use it in GitHub Desktop.
Save aoirint/41746635e0dde830d44a6414d2ef5bb3 to your computer and use it in GitHub Desktop.
import os
import shutil
import time
import argparse
from datetime import datetime as dt
VERSION = '0.1.0'
class BackupItem:
def __init__(self, src, dest):
self.src = src
self.dest = dest
self.src_exists = os.path.exists(self.src)
self.dest_exists = os.path.exists(self.dest)
self.src_modified = os.path.getmtime(self.src) if self.src_exists else -1
self.dest_modified = os.path.getmtime(self.dest) if self.dest_exists else -1
self.src_size = os.path.getsize(self.src) if self.src_exists else 0
self.dest_size = os.path.getsize(self.dest) if self.dest_exists else 0
def shouldbackup(self):
if not os.path.exists(self.dest):
return True
if int(self.dest_modified) < int(self.src_modified):
return True
return False
def shouldrmdest(self):
return not self.src_exists
class BackupEntry:
def __init__(self, src, dest, demo, verbose=False):
self.src = src
self.dest = dest
self.demo = demo
self.verbose = verbose
def mirror(self):
self.delete()
self.backup()
def delete(self):
print('Checking files to delete...')
deleteitems = []
for item in self.search_dir(self.dest, depthfirst=True):
if item.shouldrmdest():
deleteitems.append(item)
n_delete = len(deleteitems)
s_delete = sum([ item.dest_size for item in deleteitems ])
print('%d delete items (%d bytes) found' % (n_delete, s_delete, ))
n_donedeletepre = 0
n_donedelete = 0
ts = time.time()
for item in deleteitems:
#print('Delete file: %s (%d)' % (item.dest, item.dest_modified))
delitem = item.dest
if os.path.isfile(delitem):
if not self.demo:
os.remove(delitem)
elif os.path.isdir(delitem):
if not self.demo:
os.rmdir(delitem)
n_donedelete += 1
now = time.time()
if now - ts >= 1.0:
print('%.02f %%, %.02f items/s' % (100.0 * n_donedelete / n_delete, (n_donedelete - n_donedeletepre) / (now - ts)))
ts = now
n_donedeletepre = n_donedelete
print('Done delete')
def backup(self):
print('Checking files to backup...')
backupitems = []
for item in self.search_dir(self.src):
if item.shouldbackup():
backupitems.append(item)
n_backup = len(backupitems)
s_backup = sum([ item.src_size for item in backupitems ])
print('%d backup items (%d bytes) found' % (n_backup, s_backup, ))
s_donebackuppre = 0
s_donebackup = 0
ts = time.time()
for item in backupitems:
bkitem = item.src
#print('Backup file: %s (%d) => %s (%d)' % (item.src, item.src_modified, item.dest, item.dest_modified))
if os.path.isfile(bkitem):
if not self.demo:
shutil.copy2(bkitem, item.dest)
s_donebackup += item.src_size
elif os.path.isdir(bkitem):
if not self.demo:
if not os.path.exists(item.dest):
os.mkdir(item.dest)
shutil.copystat(bkitem, item.dest)
now = time.time()
if now - ts >= 1.0:
print('%.02f %%, %d bytes/s' % (100.0 * s_donebackup / s_backup, (s_donebackup - s_donebackuppre) / (now - ts)))
ts = now
s_donebackuppre = s_donebackup
print('Done backup')
now = dt.now()
nowstr = now.strftime('%Y%m%d-%H%M%S')
with open(os.path.join(self.dest, nowstr), 'w') as fp:
fp.write('%s\n' % VERSION)
def search_dir(self, dir, root=None, depthfirst=False):
if root is None:
root = dir
relpath_list = [ os.path.join(dir, basename) for basename in os.listdir(dir) ]
n_entry = len(relpath_list)
if self.verbose:
print('Directory %s: %d entries found' % (dir, n_entry))
for path in relpath_list:
treepath = os.path.relpath(path, start=root)
srcpath = os.path.join(self.src, treepath)
destpath = os.path.join(self.dest, treepath)
item = BackupItem(srcpath, destpath)
if os.path.isdir(path):
if not depthfirst:
yield item
for child in self.search_dir(path, root, depthfirst):
yield child
if depthfirst:
yield item
else:
yield item
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--src', type=str, required=True)
parser.add_argument('-d', '--dest', type=str, required=True)
parser.add_argument('-m', '--mirror', action='store_true', default=False)
parser.add_argument('-r', '--remove', action='store_true', default=False)
parser.add_argument('-b', '--backup', action='store_true', default=False)
parser.add_argument('-t', '--test', action='store_true', default=False)
args = parser.parse_args()
e = BackupEntry(args.src, args.dest, demo=args.test)
if args.mirror:
e.mirror()
else:
if args.remove:
e.remove()
if args.backup:
e.backup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment