Skip to content

Instantly share code, notes, and snippets.

Forked from chrishamant/
Last active August 29, 2015 14:19
Show Gist options
  • Save cloudsiksha/b20ff26ed501c989486b to your computer and use it in GitHub Desktop.
Save cloudsiksha/b20ff26ed501c989486b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""Split large file into multiple pieces for upload to S3.
S3 only supports 5Gb files for uploading directly, so for larger CloudBioLinux
box images we need to use boto's multipart file support.
This parallelizes the task over available cores using multiprocessing.
Usage: <file_to_transfer> <bucket_name> [<s3_key_name>]
if <s3_key_name> is not specified, the filename will be used.
--norr -- Do not use reduced redundancy storage.
--public -- Make uploaded files public.
Files are stored at cheaper reduced redundancy storage by default.
import os
import sys
import glob
import subprocess
import contextlib
import functools
import multiprocessing
from multiprocessing.pool import IMapIterator
from optparse import OptionParser
import boto
def main(transfer_file, bucket_name, s3_key_name=None, use_rr=True,
if s3_key_name is None:
s3_key_name = os.path.basename(transfer_file)
conn = boto.connect_s3()
bucket = conn.lookup(bucket_name)
mb_size = os.path.getsize(transfer_file) / 1e6
if mb_size < 60:
_standard_transfer(bucket, s3_key_name, transfer_file, use_rr)
_multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr)
s3_key = bucket.get_key(s3_key_name)
if make_public:
def upload_cb(complete, total):
def _standard_transfer(bucket, s3_key_name, transfer_file, use_rr):
print " Upload with standard transfer, not multipart",
new_s3_item = bucket.new_key(s3_key_name)
new_s3_item.set_contents_from_filename(transfer_file, reduced_redundancy=use_rr,
cb=upload_cb, num_cb=10)
def map_wrap(f):
def wrapper(*args, **kwargs):
return apply(f, *args, **kwargs)
return wrapper
def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
"""Get the multipart upload from the bucket and multipart IDs.
This allows us to reconstitute a connection to the upload
from within multiprocessing functions.
conn = boto.connect_s3()
bucket = conn.lookup(mp_bucketname)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname = mp_id
return mp
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
print " Transferring", i, part
with open(part) as t_handle:
mp.upload_part_from_file(t_handle, i+1)
def _multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True):
"""Upload large files using Amazon's multipart upload functionality.
cores = multiprocessing.cpu_count()
def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
split_size = int(min(mb_size / (split_num * 2.0), 250))
if not os.path.exists("%saa" % prefix):
cl = ["split", "-b%sm" % split_size, in_file, prefix]
return sorted(glob.glob("%s*" % prefix))
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
for _ in pmap(transfer_part, ((, mp.key_name, mp.bucket_name, i, part)
for (i, part) in
enumerate(split_file(tarball, mb_size, cores)))):
def multimap(cores=None):
"""Provide multiprocessing imap like function.
The context manager handles setting up the pool, worked around interrupt issues
and terminating the pool on completion.
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap = wrapper(
pool = multiprocessing.Pool(cores)
yield pool.imap
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-r", "--norr", dest="use_rr",
action="store_false", default=True)
parser.add_option("-p", "--public", dest="make_public",
action="store_true", default=False)
(options, args) = parser.parse_args()
if len(args) < 2:
print __doc__
kwargs = dict(use_rr=options.use_rr, make_public=options.make_public)
main(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment