-
-
Save copernicus/1808631 to your computer and use it in GitHub Desktop.
s3 multithreaded bucket copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
import threading, Queue | |
import boto, sys, time | |
import argparse | |
import logging | |
parser = argparse.ArgumentParser(description="Multithreaded mass copier for Amazon S3") | |
parser.add_argument("-s", help="Source bucket", dest="src_bucket", type=str, required=True) | |
parser.add_argument("-d", help="Destination bucket", dest="dst_bucket", type=str, required=True) | |
parser.add_argument("-n", help="Number of copier threads", dest="num_threads", type=int, default=10) | |
parser.add_argument("-l", help="Logging level", dest="log_level", type=str, default="WARN") | |
parser.add_argument("-L", help="Logging file, default is STDOUT", dest="log_dest", type=str, default="STDOUT") | |
options = parser.parse_args() | |
logger = logging.getLogger("S3 MultiThreadedCopy") | |
if options.log_dest == "STDOUT": | |
formatter = logging.Formatter('%(name)-2s: %(levelname)-8s %(message)s') | |
log_dst = logging.StreamHandler(sys.stdout) | |
log_dst.setFormatter(formatter) | |
else: | |
log_dst = logging.StreamHandler(options.log_dst) | |
log_dst.setLevel(getattr(logging, options.log_level.upper())) | |
logger.addHandler(log_dst) | |
logger.setLevel(getattr(logging, options.log_level.upper())) | |
logger.info("Connecting to S3") | |
s3 = boto.connect_s3() | |
logger.warn("copying from %s to %s" % (options.src_bucket, options.dst_bucket)) | |
def get_q_worker(dst_bucket, q): | |
def thread_worker(): | |
while not q.empty(): | |
try: | |
k = q.get(True, 1) | |
except Queue.Empty: | |
logger.warn("Queue is empty") | |
return | |
logger.info("Copying %s" % k.name) | |
try: | |
k.copy(dst_bucket, k.name) | |
except Exception as e: | |
logger.error("Error while copying %s: %s" % (k.name, e)) | |
q.task_done() | |
return thread_worker | |
def fill_queue(src_bucket, q): | |
for k in src_bucket.list(): q.put(k) | |
q = Queue.Queue() | |
src_bucket = s3.get_bucket(options.src_bucket) | |
src_listing = threading.Thread(target=lambda: fill_queue(src_bucket, q)) | |
src_listing.start() | |
workers = [] | |
logger.info("Wating for keys from source bucket listing...") | |
while q.empty(): | |
time.sleep(1) | |
for i in range(options.num_threads): | |
worker = threading.Thread(target=get_q_worker(options.dst_bucket, q)) | |
logger.info("startning worker %d" % i) | |
worker.daemon = True | |
worker.start() | |
workers.append(worker) | |
src_listing.join() | |
logger.info("Done listing source") | |
while not q.empty(): | |
print "Q size is %d" % q.qsize() | |
time.sleep(5) | |
q.join() | |
print "Done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment