Created
May 23, 2013 19:14
-
-
Save bbengfort/5638664 to your computer and use it in GitHub Desktop.
A chunker that appends data to file buckets at a specified directory, such that no file exceeds the size specified. It will append the data to the first file that has enough space available, if none do, then it appends the data to a new file. Note that an exception is raised if the data is bigger than the maximum file size. To test it with rando…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
DEFAULT_SIZE = 128 * 1024 * 1024 #128 MB | |
class Bucket(object): | |
""" | |
Handles the writing of a stream into a bucket. | |
Pass a path to a directory which holds the bucket, and | |
the chunk size required for each file in the bucket. | |
""" | |
def __init__(self, path, size=DEFAULT_SIZE): | |
path = os.path.expandvars(path) | |
path = os.path.expanduser(path) | |
path = os.path.abspath(path) | |
if not os.path.exists(path): | |
os.makedirs(path, 0755) | |
elif os.path.isfile(path): | |
raise TypeError("Please specify the path to a directory, not a file") | |
self.path = path | |
self.size = size | |
def append(self, data, debug=False): | |
dsize = sys.getsizeof(data) | |
if dsize > self.size: | |
raise ValueError("Size of data is greater than bucket size.") | |
outpath = None | |
for path in self: | |
if os.path.getsize(path) + dsize <= self.size: | |
outpath = path | |
break | |
if not outpath: | |
outpath = self.new_slice() | |
with open(outpath, 'a+') as outfile: | |
if debug: | |
print "Appending %i bytes of data to %s" % (dsize, outpath) | |
outfile.write(data) | |
def new_slice(self): | |
current = 0 | |
for name in os.listdir(self.path): | |
name = name.replace("data", "") | |
if int(name) > current: | |
current = int(name) | |
current += 1 | |
return os.path.join(self.path, "data%i" % current) | |
def __iter__(self): | |
for name in os.listdir(self.path): | |
fpath = os.path.join(self.path, name) | |
if os.path.isfile(fpath): | |
yield fpath | |
def __len__(self): | |
count = 0 | |
for name in self: | |
count += 1 | |
return count | |
def __str__(self): | |
return "<Bucket at %s with %i chunks of less than %i bytes>" % (self.path, len(self), self.size) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import string | |
import bucket | |
import time | |
def generate(length=300): | |
chars = string.letters + string.digits + string.punctuation | |
return ''.join(random.choice(chars) for x in range(length)) | |
def random_length(low=1024, high=4096): | |
return random.randint(low, high) | |
if __name__ == "__main__": | |
manager = bucket.Bucket(path="~/Desktop/RandomBucket", size=10240) | |
while True: | |
try: | |
manager.append(generate(random_length()), debug=True) | |
time.sleep(0.5) | |
except KeyboardInterrupt: | |
print "Finished!" | |
print str(manager) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment