Created
February 12, 2019 01:58
-
-
Save alexklibisz/986527d9ca8707df69e396ee24893d09 to your computer and use it in GitHub Desktop.
Using a threadpool to speedup S3 downloads in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures.thread import ThreadPoolExecutor | |
from pprint import pprint | |
from time import time | |
import boto3 | |
import os | |
import botocore | |
import shell as shell | |
bucket = "aft-vbi-pds" # Source bucket. | |
prefix = "bin-images/000" # Source prefix. | |
out_dir = "/tmp/bin-images" # Directory to save downloaded files. | |
def s3obj_to_file(s3_pointer): | |
"""Function that takes an S3 pointer (i.e. the thing returned by calling `list_objects`), and saves it to a file.""" | |
data = s3.get_object(Bucket=bucket, Key=s3_pointer["Key"]).get("Body").read() | |
fname = "%s/%s" % (out_dir, s3_pointer.get("Key").replace(prefix, "")) | |
with open(fname, "wb") as fp: | |
fp.write(data) | |
def cleanup(): | |
os.system("rm -rf %s" % out_dir) | |
os.system("mkdir -p %s" % out_dir) | |
# Setup the s3 client. If you don't give it this type of config, it will print warnings at runtime. | |
# The config is just the number of parallel downloads that the s3 client can handle. | |
cfg = botocore.client.Config(max_pool_connections=os.cpu_count() * 5) | |
s3 = boto3.client("s3", config=cfg) | |
# Get the list of objects. Note that this does not actually download the data, just pointers to the objects. | |
objects = s3.list_objects(Bucket=bucket, Prefix=prefix).get("Contents") | |
print("Found %d objects" % (len(objects))) | |
# Make directory where they get saved. | |
cleanup() | |
# Download them serially (i.e. using a regular for-loop). | |
t0 = time() | |
for obj in objects: | |
s3obj_to_file(obj) | |
print("Elapsed time serial: %.3lf seconds" % (time() - t0)) | |
print("Found %d objects in the output directory" % (len(os.listdir(out_dir)))) | |
os.system("du -hs %s" % out_dir) | |
# Clean up again to make sure we're not cheating... | |
cleanup() | |
# Now do the same thing but using a threadpool. | |
t0 = time() | |
with ThreadPoolExecutor() as tpex: | |
tpex.map(s3obj_to_file, objects) | |
print("Elapsed time with threads: %.3lf seconds" % (time() - t0)) | |
print("Found %d objects in the output directory" % (len(os.listdir(out_dir)))) | |
os.system("du -hs %s" % out_dir) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the output from running on my macbook pro with 8 cores (40 threads). | |
Found 99 objects | |
Elapsed time serial: 7.937 seconds | |
Found 99 objects in the output directory | |
5.5M /tmp/bin-images | |
Elapsed time with threads: 1.331 seconds | |
Found 99 objects in the output directory | |
5.5M /tmp/bin-images |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment