Skip to content

Instantly share code, notes, and snippets.

@infoankitp
Last active January 6, 2019 08:17
Show Gist options
  • Save infoankitp/fb1929e87bf8bd65599e0e2f08445870 to your computer and use it in GitHub Desktop.
Save infoankitp/fb1929e87bf8bd65599e0e2f08445870 to your computer and use it in GitHub Desktop.
A lambda function which will download images using the given link in a zipped format.
import time
import boto3
import urllib
import datetime
import os
import json
import requests
import traceback
from PIL import Image
from io import BytesIO
import shutil
import tarfile
class ImageDownloaderLambda:
def __init__(self):
self.input_url = "/tmp/input_images"
self.output_url = "/tmp/"
self.error_output_url = "/tmp/"
self.AWS_BUCKET_NAME = "*******BUCKET NAME*******"
self.s3_output_path = "lambda_test/output_files/Images_Zipped/"
self.s3_error_output_path = "lambda_test/output_files/Errors_Zipped/"
self.AWS_ACCESS_KEY_ID = "********YOUR ACCESS KEY********"
self.AWS_SECRET_ACCESS_KEY = "******YOUR ACCESS KEY******"
self.s3 = boto3.resource('s3',aws_access_key_id=self.AWS_ACCESS_KEY_ID,aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY)
self.s3_client = boto3.client('s3',aws_access_key_id=self.AWS_ACCESS_KEY_ID,aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY)
self.non_image_response = []
self.image_file_list = []
def get_request(self, url, num):
file_type = None
try:
resp = requests.get(url, timeout = 3)
if resp.status_code == 200 :
header = resp.headers['content-type']
print("Getting Image for Num : {}".format(num))
if header == 'image/jpeg' or header == 'image/png' or header == 'image/svg':
print("Found Image for Num : {}".format(num))
i = Image.open(BytesIO(resp.content))
file_type = header.split("/")[-1]
img_file_name = str(num)+"."+file_type
self.image_file_list.append(img_file_name)
with open(self.output_url + img_file_name,"wb") as f:
f.write(resp.content)
else:
print(header)
self.non_image_response.append({"thumbnail": url, "num": num})
else:
print(resp.status_code)
print(resp.text)
self.non_image_response.append({"thumbnail":url,"num": num})
except Exception as err:
print(err)
print(traceback.format_exc())
pass
return file_type
def make_tarfile(self, output_filename, file_list):
if len(file_list) > 0:
with tarfile.open(self.output_url + output_filename, "w:gz") as tar:
for file in file_list:
tar.add(self.output_url + file, file)
tar.close()
print("Tar File Formed!!!", output_filename)
def process_image(self, data):
try:
filetype = self.get_request(data['thumbnail'], data['num'])
except Exception as err:
print(err)
print(traceback.format_exc())
pass
return
def write_non_image_response(self,file_name):
filename = file_name.split("/")[-1]
try:
if os._exists(self.error_output_url + filename):
os.remove(self.error_output_url + filename)
except Exception as e:
print(e)
print(traceback.format_exc())
pass
#file_name = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")+".txt"
with open(self.error_output_url+filename,"w") as f:
for item in self.non_image_response:
f.write("%s\n" % item)
k = self.s3_error_output_path +filename
with open(self.error_output_url+filename,"r") as f:
content = f.read()
self.s3_client.put_object(Bucket=self.AWS_BUCKET_NAME, Key=k, Body=content)
print("Uploaded Error File on S3!!!", file_name)
return
def process_data(self,key):
self.download_list_file(key)
#output_data = []
file_name = None
with open(self.input_url+key.split("/")[-1], 'r+') as _file:
count = 0
for line in _file:
if count == 0:
count += 1
file_name = line.strip("\n").strip("")
continue
#print(line)
image_data = json.loads(line.strip("\n").strip(" "))
print("Downloading Images for {} Num".format(count))
self.process_image(image_data)
count += 1
if file_name is not None:
tmp_file_name = file_name + ".tar.gz"
self.make_tarfile(tmp_file_name, self.image_file_list)
self.write_tar_file(tmp_file_name)
print("No. of Non Image Responses : "+str(len(self.non_image_response)))
self.write_non_image_response(file_name)
def write_tar_file(self,key):
time_value = datetime.datetime.utcfromtimestamp(int(key.split(".")[0]))
s3_path = "{}/{}/{}/{}".format(time_value.year, time_value.month, time_value.day, key)
k = self.s3_output_path + s3_path
with open(self.output_url+key,"rb") as f:
content = f.read()
self.s3_client.put_object(Bucket=self.AWS_BUCKET_NAME, Key=k, Body=content)
print("Uploaded Image Tar on S3!!! ", key)
def download_list_file(self,key):
try:
self.s3.Bucket(self.AWS_BUCKET_NAME).download_file(key, self.input_url+key.split("/")[-1])
except Exception as e:
print(e)
print(traceback.format_exc())
pass
def lambda_handler(event, context):
print("got event " + str(event))
#s3 = boto3.resource('s3')
start_time = time.time()
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
print("key is %s" % key)
collect_image = ImageDownloaderLambda()
collect_image.process_data(key)
print("--- %s total execution time ---" % (time.time() - start_time))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment