Skip to content

Instantly share code, notes, and snippets.

@janaki-sasidhar
Created February 10, 2021 14:38
Show Gist options
  • Save janaki-sasidhar/a61853f7863c9fdd16b199c68a2660ac to your computer and use it in GitHub Desktop.
Save janaki-sasidhar/a61853f7863c9fdd16b199c68a2660ac to your computer and use it in GitHub Desktop.
import boto3,os
from pathlib import Path
from flask import jsonify,Flask,request
#initialize flask app
app=Flask(__name__)
#util functions
def get_bucket():
s3_resource = _get_s3_resource()
bucket = os.environ.get('S3_BUCKET')
return s3_resource.Bucket(bucket)
def _get_s3_resource():
if os.environ.get('S3_KEY') and os.environ.get('S3_SECRET'):
return boto3.resource(
's3',
aws_access_key_id= os.environ.get('S3_KEY'),
aws_secret_access_key=os.environ.get('S3_SECRET')
)
else:
return boto3.resource('s3')
@app.route('/upload_file',methods=['GET','POST'])
def upload_file():
if request.method=='GET':
file_path = request.args.get('file_path')
print('file path is ',file_path)
uploading_file_path = Path(file_path)
if uploading_file_path.exists() and uploading_file_path.is_file():
status = upload_file_to_s3(uploading_file_path)
if status=='ok':
return {"status":"ok"}
else:
return {"status":status}
else:
return {"status":"not a file"}
else:
return {"status":"Not a get request"}
def upload_file_to_s3(file_path):
my_bucket = get_bucket()
try:
file_full_path = str(file_path.resolve())
my_bucket.Object(file_path.name).put(Body=file_full_path, Key=('testing/'+file_path.name))
print('uploaded')
return "file_uploaded"
except Exception as e:
print(e)
return "file upload failed"
if __name__=='__main__':
app.run(debug=True,host='0.0.0.0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment