Created
May 22, 2019 10:40
-
-
Save suvojit-0x55aa/26f022bf31b6b045db43ff9bc34b162c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division, print_function | |
import concurrent.futures as cf | |
from glob import glob | |
import logging | |
import os | |
from time import time, sleep | |
import boto3 | |
class AWSUploader: | |
''' | |
AWS uploader class definition. | |
Params: | |
aws_bucket: A valid bucket name to be uploaded to. | |
max_workers: If not None upload will be done in calling thread, | |
else each object will maintatin its own thread pool, | |
and the job is executed based on a Job Queue. | |
''' | |
def __init__(self, aws_bucket, max_workers): | |
self.max_workers = max_workers | |
self.aws_bucket = aws_bucket | |
self.log = logging.getLogger(__name__) | |
if self.max_workers is not None: | |
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_workers) | |
print('Initiating {} with {} size ThreadPool, AWS bucket {}'.format( | |
self.__class__.__name__, self.max_workers, self.aws_bucket)) | |
def uploadCallback(self, so_far): | |
print('{} bytes transferred'.format(so_far)) | |
def _uploadToAWS(self, filepath): | |
try: | |
taw = time() | |
# Create analytics object that runs on main thread | |
# aws_analytics = REST() | |
# Create AWS session with credential from ENV variable | |
awsSession = boto3.session.Session() | |
s3 = awsSession.resource('s3') | |
bucket = s3.Bucket(self.aws_bucket) | |
key_name = os.path.basename(filepath) | |
try: | |
bucket.Object(key_name).get() | |
tEndaw = time() - taw | |
print("Key exist: {} | Time taken: {}".format( | |
key_name, tEndaw)) | |
except Exception as e: | |
# Upload to bucket. | |
bucket.upload_file( | |
filepath, key_name, Callback=self.uploadCallback) | |
tEndaw = time() - taw | |
print("AWS Upload with key: {} | Time taken: {}".format( | |
key_name, tEndaw)) | |
# Upload to analytics. | |
# aws_analytics.addToKibana(key_name, self.g.TIME_FOR_AWS_UPLOAD, | |
# {"time": (tEndaw)}, userId, hostname) | |
return 'file uploaded succesfully' | |
except Exception as e: | |
print('{} upload failed for {}'.format(e, | |
os.path.basename(filepath))) | |
def uploadToAWS(self, filepath): | |
''' | |
Wrapper function to call the uploader based on availability of | |
Thread Pool Executor. | |
''' | |
if self.max_workers is not None: | |
self.executor.submit(self._uploadToAWS, filepath) | |
else: | |
self._uploadToAWS(filepath) | |
if __name__ == '__main__': | |
aws_uploader = AWSUploader('bucket', None) | |
filenames = glob('/static/image/*.jpg') | |
for files in filenames: | |
aws_uploader.uploadToAWS(files) | |
sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment