|
import boto3 |
|
import os, sys |
|
import json |
|
from botocore.exceptions import ClientError |
|
from collections import defaultdict |
|
import logging |
|
|
|
from botocore import UNSIGNED |
|
from botocore.client import Config |
|
import hashlib |
|
|
|
|
|
def get_logger(name): |
|
# Configure logging |
|
logging.basicConfig( |
|
# format='%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s', |
|
format='%(levelname)-8s %(message)s', |
|
) |
|
logger = logging.getLogger(name) |
|
logger.setLevel(os.getenv('LOG_LEVEL', logging.INFO)) |
|
# handler = logging.StreamHandler(sys.stdout) |
|
# handler.setLevel(os.getenv('LOG_LEVEL', logging.INFO)) |
|
# logger.addHandler(handler) |
|
return logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
class ObjectStore: |
|
def __init__(self) -> None: |
|
'''Initialize S3 client''' |
|
self.config = { |
|
'endpoint-url': os.getenv("S3_ENDPOINT_URL", "https://ncsa.osn.xsede.org"), |
|
'region-name': os.getenv("S3_REGION_NAME", ""), |
|
'bucket': os.getenv("S3_BUCKET", "phy240006-bucket01"), |
|
'aws_access_key_id': os.getenv("AWS_ACCESS_KEY_ID", ''), |
|
'aws_secret_access_key': os.getenv("AWS_SECRET_ACCESS_KEY", ''), |
|
} |
|
if not (self.config['aws_access_key_id'] and self.config['aws_secret_access_key']): |
|
self.client = boto3.client( |
|
"s3", |
|
endpoint_url=self.config['endpoint-url'], |
|
region_name=self.config['region-name'], |
|
config=Config(signature_version=UNSIGNED), |
|
) |
|
else: |
|
self.client = boto3.client( |
|
"s3", |
|
endpoint_url=self.config['endpoint-url'], |
|
region_name=self.config['region-name'], |
|
aws_access_key_id=self.config['aws_access_key_id'], |
|
aws_secret_access_key=self.config['aws_secret_access_key'], |
|
) |
|
|
|
def store_folder(self, src_dir="", bucket_root_path=""): |
|
for dirpath, dirnames, filenames in os.walk(src_dir): |
|
for filename in filenames: |
|
self.put_object( |
|
path=os.path.join(bucket_root_path, dirpath.replace(src_dir, '').strip('/'), filename), |
|
file_path=os.path.join(dirpath, filename), |
|
) |
|
|
|
def put_object(self, path="", data="", file_path="", json_output=True): |
|
if data: |
|
logger.debug(f'''Uploading data object to object store: "{path}"''') |
|
if json_output: |
|
body = json.dumps(data, indent=2) |
|
else: |
|
body = data |
|
self.client.put_object(Body=body, Bucket=self.config['bucket'], Key=path) |
|
elif file_path: |
|
logger.debug(f'''Uploading file to object store: "{path}"''') |
|
self.client.upload_file(file_path, self.config['bucket'], path) |
|
|
|
def get_object(self, path=""): |
|
try: |
|
obj = self.client.get_object( |
|
Bucket=self.config['bucket'], |
|
Key=path) |
|
except ClientError as ex: |
|
if ex.response['Error']['Code'] == 'NoSuchKey': |
|
obj = None |
|
else: |
|
raise |
|
return obj |
|
|
|
def download_object(self, path="", file_path=""): |
|
self.client.download_file( |
|
self.config['bucket'], |
|
path, |
|
file_path, |
|
) |
|
|
|
def create_folder(self, folder_path): |
|
response = self.client.put_object( |
|
Bucket=self.config['bucket'], |
|
Body='', |
|
Key=f'''{folder_path.strip('/')}/''' |
|
) |
|
print(response) |
|
|
|
def delete_directory(self, root_path): |
|
objects = self.client.list_objects( |
|
Bucket=self.config['bucket'], |
|
Prefix=root_path, |
|
) |
|
if 'Contents' not in objects: |
|
return |
|
for obj in objects['Contents']: |
|
self.client.delete_object(Bucket=self.config['bucket'], Key=obj['Key']) |
|
|
|
def list_directory(self, root_path, full_object=False): |
|
response = self.client.list_objects_v2( |
|
Bucket=self.config['bucket'], |
|
Prefix=root_path.strip('/'), |
|
) |
|
if 'Contents' not in response: |
|
return [] |
|
objects = response['Contents'] |
|
if response['IsTruncated']: |
|
while 'NextContinuationToken' in response: |
|
response = self.client.list_objects_v2( |
|
Bucket=self.config['bucket'], |
|
Prefix=root_path.strip('/'), |
|
ContinuationToken=response['NextContinuationToken'] |
|
) |
|
if 'Contents' in response: |
|
objects.extend(response['Contents']) |
|
if full_object: |
|
return objects |
|
else: |
|
return [obj['Key'] for obj in objects] |
|
|
|
def list_directory_tree(self, root_path): |
|
objects = self.client.list_objects_v2( |
|
Bucket=self.config['bucket'], |
|
Prefix=root_path.strip('/'), |
|
) |
|
|
|
tree = lambda: defaultdict(tree) |
|
files_tree = tree() |
|
|
|
if 'Contents' in objects: |
|
for obj in objects['Contents']: |
|
parts = obj['Key'].split('/') |
|
current_level = files_tree |
|
for part in parts[:-1]: |
|
current_level = current_level[part] |
|
parts[-1] |
|
|
|
return files_tree |
|
|
|
def object_info(self, path): |
|
# print(f'''Getting object info for path: {path}''') |
|
if path.endswith('/'): |
|
response = self.client.list_objects_v2( |
|
Bucket=self.config['bucket'], |
|
Prefix=path.strip('/'), |
|
) |
|
if 'Contents' not in response: |
|
return {} |
|
else: |
|
return response |
|
try: |
|
response = self.client.head_object( |
|
Bucket=self.config['bucket'], |
|
Key=path.strip('/'), |
|
) |
|
except ClientError as ex: |
|
if ex.response['Error']['Code'] == 'NoSuchKey': |
|
response = {} |
|
elif ex.response['Error']['Code'] == '404': |
|
response = {} |
|
else: |
|
print(ex.response['Error']['Code']) |
|
raise |
|
return response |
|
|
|
def object_exists(self, path): |
|
# print(f'''Checking if path exists: {path}''') |
|
if self.object_info(path): |
|
return True |
|
else: |
|
return False |
|
|
|
def copy_directory(self, src_path, dst_root_path): |
|
objects = self.client.list_objects( |
|
Bucket=self.config['bucket'], |
|
Prefix=src_path, |
|
) |
|
for obj in objects['Contents']: |
|
logger.debug(obj) |
|
dst_rel_path = obj['Key'].replace(src_path, '').strip('/') |
|
dst_path = os.path.join(dst_root_path, dst_rel_path) |
|
logger.debug(f'dst_path: {dst_path}') |
|
self.client.copy_object(CopySource={ |
|
'Bucket': self.config['bucket'], |
|
'Key': obj['Key'] |
|
}, Bucket=self.config['bucket'], Key=dst_path) |
|
|
|
def md5_checksum(self, file_path): |
|
'''https://stackoverflow.com/a/58239738''' |
|
m = hashlib.md5() |
|
with open(file_path, 'rb') as f: |
|
for data in iter(lambda: f.read(1024 * 1024), b''): |
|
m.update(data) |
|
hexdigest = m.hexdigest() |
|
logger.debug(f'calculated md5 checksum: {hexdigest}') |
|
return hexdigest |
|
|
|
def etag_checksum(self, file_path, chunk_size=512 * 1024 * 1024): |
|
'''https://stackoverflow.com/a/58239738''' |
|
md5s = [] |
|
with open(file_path, 'rb') as f: |
|
for data in iter(lambda: f.read(chunk_size), b''): |
|
md5s.append(hashlib.md5(data).digest()) |
|
md5sum = hashlib.md5(b"".join(md5s)) |
|
etag_checksum = f'{md5sum.hexdigest()}-{len(md5s)}' |
|
logger.debug(f'calculated etag checksum: {etag_checksum}') |
|
return etag_checksum |
|
|
|
def etag_compare(self, file_path, etag_source, chunk_size=512 * 1024 * 1024): |
|
'''https://stackoverflow.com/a/58239738''' |
|
etag_source = etag_source.strip('"') |
|
logger.debug(f'source etag checksum: {etag_source}') |
|
if '-' in etag_source and etag_source == self.etag_checksum(file_path, chunk_size=chunk_size): |
|
return True |
|
if '-' not in etag_source and etag_source == self.md5_checksum(file_path): |
|
return True |
|
return False |