Created
March 15, 2013 01:50
-
-
Save blinsay/5166916 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
from eventlet import patcher, GreenPool | |
patcher.monkey_patch(all = True) | |
import sys | |
from eventlet.queue import Queue, Empty | |
from boto import connect_s3 | |
from boto.s3.connection import S3Connection | |
class GreenPoolDownloader(object): | |
def __init__(self, aws_id, aws_secret, greenthreads = 80): | |
self.aws_id = aws_id | |
self.aws_secret = aws_secret | |
self.pool = GreenPool(size = greenthreads) | |
self.queue = Queue() | |
self._broken_pipe = False | |
@classmethod | |
def from_boto_conn(cls, boto_conn, greenthreads = 20): | |
return cls(boto_conn.access_key, boto_conn.secret_key, greenthreads = greenthreads) | |
def _new_conn(self, bucket_name): | |
""" | |
Return a new connection to a bucket. | |
""" | |
conn = S3Connection(self.aws_id, self.aws_secret) | |
bucket = conn.get_bucket(bucket_name) | |
return bucket | |
def download_key(self, out_file): | |
try: | |
bucket_name, key_name = self.queue.get_nowait() | |
except Empty: | |
return | |
if self._broken_pipe: | |
return | |
bucket = self._new_conn(bucket_name) | |
k = bucket.get_key(key_name) | |
if k is None: | |
raise ValueError, "key {name} does not exist".format(name = key_name) | |
try: | |
k.get_contents_to_file(out_file) | |
out_file.flush() | |
except IOError, ioe: | |
errno, _ = ioe | |
if errno == 32: | |
self._broken_pipe = True | |
else: | |
raise | |
self.queue.task_done() | |
def download_all(self, bucket_name, prefix, out_file): | |
bucket = self._new_conn(bucket_name) | |
for key in bucket.list(prefix = prefix): | |
if key.size != 0: | |
self.queue.put((bucket_name, key.name)) | |
while not self.queue.empty(): | |
self.pool.spawn_n(self.download_key, out_file) | |
self.pool.waitall() | |
def parse_s3_uri(uri): | |
""" | |
Split an S3 URI into a (bucket, path) tuple. | |
""" | |
if not uri.startswith('s3://'): | |
raise ValueError, "invalid S3 uri:, " + uri | |
uri = uri[5:] | |
uri = uri.lstrip('/') | |
bucket_name, path = uri.split('/', 1) | |
return bucket_name, path | |
# NOTE: This is from the serial version. Kept here for when I break the concurrent | |
# version! Async I/O is hard. | |
def merge_to_file(s3_conn, uri, fp, join_with = None): | |
""" | |
Given an S3 uri, merge the contents of all keys that match the object prefix | |
into the given file. Keys are written to the file in an arbitrary order. | |
If a `join_with` argument is given, the object is written at the end of | |
every object downloaded from S3. | |
""" | |
bucket_name, key_prefix = parse_s3_uri(uri) | |
bucket = s3_conn.get_bucket(bucket_name) | |
for key in bucket.list(prefix = key_prefix): | |
if key.size: | |
key.get_contents_to_file(fp) | |
if join_with is not None: | |
fp.write(join_with) | |
fp.flush() | |
def main(args): | |
if len(args) < 1: | |
print >> sys.stderr, "USAGE: s3merge s3_uri [s3_uri...]" | |
sys.exit(-1) | |
s3_conn = connect_s3() | |
downloader = GreenPoolDownloader.from_boto_conn(s3_conn, greenthreads = 20) | |
for arg in args: | |
try: | |
bucket_name, key_prefix = parse_s3_uri(arg) | |
downloader.download_all(bucket_name, key_prefix, sys.stdout) | |
except Exception, e: | |
print >> sys.stderr, "ERROR: %s" % e | |
sys.exit(-1) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment