Created
January 19, 2016 00:49
-
-
Save Rudd-O/da8bc169e2cccb3a3707 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import Queue | |
import collections | |
import cStringIO | |
import signal # Important. | |
import subprocess | |
import tarfile | |
import threading | |
def which_files_to_compress(): | |
yield "/var/lib/qubes/appvms/somelargevm/private.img" | |
def transform(path): | |
return path.lstrip("/") | |
def chunked_reader(fobject): | |
# This particular value is the existing value | |
# in qubes backup. Given that the threadpool | |
# is only 4 deep, and four processes are | |
# expected to run at this time at the maximum | |
# then the memory comsumption will be bounded | |
# by twice this value. If this value is | |
# lowered, or the parallelism is lowered from | |
# four to two, then memory cnosumption will | |
# be bounded by the buffer size below. | |
return fobject.read(1024*1024*1024) | |
class SerializedTarWriter(object): | |
def __init__(self, tarfileobject): | |
self.t = tarfileobject | |
self.m = threading.Lock() | |
def addfile(self, tarinfoobject, stringio): | |
self.m.acquire() | |
try: | |
self.t.addfile(tarinfoobject, stringio) | |
finally: | |
self.m.release() | |
def gettarinfo(self, name): | |
return self.t.gettarinfo(name) | |
class TaskArbitrator(object): | |
def __init__(self, numtasks): | |
self.sem = threading.Semaphore(numtasks) | |
self.exception = None | |
self.threads = [] | |
self.lock = threading.RLock() | |
self.supervisor_thread = threading.Thread(target=self._supervisor) | |
self.supervisor_thread.setDaemon(True) | |
self.supervisor_thread.start() | |
def _supervisor(self): | |
while True: | |
with self.lock: | |
print len(self.threads) | |
time.sleep(1) | |
def add(self, kallable): | |
with self.lock: | |
for t in self.threads[:]: | |
t.join(0) | |
if t.isAlive(): | |
continue | |
print "removing dead thread", t | |
self.threads.remove(t) | |
if self.exception: | |
self.end() | |
t = threading.Thread(target=self._run_thread, args=(kallable,)) | |
self.sem.acquire() | |
t.start() | |
self.threads.append(t) | |
def _run_thread(self, kallable): | |
try: | |
kallable() | |
except BaseException, e: | |
with self.lock: | |
self.exception = e | |
self.sem.release() | |
def end(self): | |
print "ending" | |
with self.lock: | |
for t in self.threads[:]: | |
t.join() | |
if not t.isAlive(): | |
self.threads.remove(t) | |
with self.lock: | |
if self.exception: | |
raise self.exception | |
def add_contents_to_tar(tarqueue, originalname, nameintar, data): | |
ti = tarqueue.gettarinfo(originalname) | |
ti.size = len(data) | |
ti.name = nameintar | |
import time; time.sleep(5) # Simulate slow write. Unnecessary. | |
tarqueue.addfile(ti, cStringIO.StringIO(data)) | |
print "added contents of file", nameintar | |
def add_hmac_to_tar(tarqueue, originalname, nameintar, data): | |
p = subprocess.Popen(["openssl", "dgst", | |
"-SHA512", "-hmac", "password"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE) | |
stdout, stderr = p.communicate(data) | |
ret = p.wait() | |
assert 0 == ret, ret | |
ti = tarqueue.gettarinfo(originalname) | |
ti.name = nameintar | |
ti.size = len(stdout) | |
import time; time.sleep(5) # Simulate slow write. Unnecessary. | |
tarqueue.addfile(ti, cStringIO.StringIO(stdout)) | |
print "added hmac to", nameintar | |
def run_backup(compressor="cat"): | |
tar = tarfile.open("/path/to/backup", "w") | |
tarqueue = SerializedTarWriter(tar) | |
tasker = TaskArbitrator(4) | |
buf = bytearray(10) | |
for path in which_files_to_compress(): | |
pathintar = transform(path) | |
originalfobject = open(path, "rb") | |
cpr = subprocess.Popen(compressor, | |
stdin=originalfobject, | |
stdout=subprocess.PIPE) | |
originalfobject.close() | |
for n in xrange(1000): | |
red = chunked_reader(cpr.stdout) | |
if len(red) == 0: break | |
tasker.add(lambda: add_contents_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n), red)) | |
tasker.add(lambda: add_hmac_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n) + ".hmac", red)) | |
ret = cpr.wait() | |
assert ret == 0, ret | |
assert n < 1000, n | |
tasker.end() | |
tar.close() | |
if __name__ == "__main__": | |
run_backup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment