Skip to content

Instantly share code, notes, and snippets.

@nukemberg
Created February 1, 2012 11:44
Show Gist options
  • Save nukemberg/1716697 to your computer and use it in GitHub Desktop.
Save nukemberg/1716697 to your computer and use it in GitHub Desktop.
s3 multithreaded bucket copy
#! /usr/bin/python
import threading, Queue
import boto, sys, time
import argparse
import logging
parser = argparse.ArgumentParser(description="Multithreaded mass copier for Amazon S3")
parser.add_argument("-s", help="Source bucket", dest="src_bucket", type=str, required=True)
parser.add_argument("-d", help="Destination bucket", dest="dst_bucket", type=str, required=True)
parser.add_argument("-n", help="Number of copier threads", dest="num_threads", type=int, default=10)
parser.add_argument("-l", help="Logging level", dest="log_level", type=str, default="WARN")
parser.add_argument("-L", help="Logging file, default is STDOUT", dest="log_dest", type=str, default="STDOUT")
options = parser.parse_args()
logger = logging.getLogger("S3 MultiThreadedCopy")
if options.log_dest == "STDOUT":
formatter = logging.Formatter('%(name)-2s: %(levelname)-8s %(message)s')
log_dst = logging.StreamHandler(sys.stdout)
log_dst.setFormatter(formatter)
else:
log_dst = logging.StreamHandler(options.log_dst)
log_dst.setLevel(getattr(logging, options.log_level.upper()))
logger.addHandler(log_dst)
logger.setLevel(getattr(logging, options.log_level.upper()))
logger.info("Connecting to S3")
s3 = boto.connect_s3()
logger.warn("copying from %s to %s" % (options.src_bucket, options.dst_bucket))
def get_q_worker(dst_bucket, q):
def thread_worker():
while not q.empty():
try:
k = q.get(True, 1)
except Queue.Empty:
logger.warn("Queue is empty")
return
logger.info("Copying %s" % k.name)
try:
k.copy(dst_bucket, k.name)
except Exception as e:
logger.error("Error while copying %s: %s" % (k.name, e))
q.task_done()
return thread_worker
def fill_queue(src_bucket, q):
for k in src_bucket.list(): q.put(k)
q = Queue.Queue()
src_bucket = s3.get_bucket(options.src_bucket)
src_listing = threading.Thread(target=lambda: fill_queue(src_bucket, q))
src_listing.start()
workers = []
logger.info("Wating for keys from source bucket listing...")
while q.empty():
time.sleep(1)
for i in range(options.num_threads):
worker = threading.Thread(target=get_q_worker(options.dst_bucket, q))
logger.info("startning worker %d" % i)
worker.daemon = True
worker.start()
workers.append(worker)
src_listing.join()
logger.info("Done listing source")
while not q.empty():
print "Q size is %d" % q.qsize()
time.sleep(5)
q.join()
print "Done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment