Last active
July 8, 2024 09:55
-
-
Save tkyaji/307b638975de8b565cb3e4e331fad398 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import argparse | |
import boto3 | |
import urllib.parse | |
def normalize_file_size(file_bytes): | |
if file_bytes / (1024 * 1024 * 1024) > 1.0: | |
gb = round(file_bytes / (1024 * 1024 * 1024), 2) | |
return gb, 'GB' | |
elif file_bytes / (1024 * 1024) > 1.0: | |
mb = round(file_bytes / (1024 * 1024), 2) | |
return mb, 'MB' | |
elif file_bytes / 1024 > 1.0: | |
kb = round(file_bytes / 1024, 2) | |
return kb, 'KB' | |
return file_bytes, 'B' | |
def get_file_size_str(file_bytes): | |
unit_bytes, unit = normalize_file_size(file_bytes) | |
return f'{unit_bytes} {unit}' | |
def get_file_size_per_unit(file_bytes, unit): | |
if unit == 'GB': | |
gb = round(file_bytes / (1024 * 1024 * 1024), 2) | |
return gb | |
elif unit == 'MB': | |
mb = round(file_bytes / (1024 * 1024), 2) | |
return mb | |
elif unit == 'KB': | |
kb = round(file_bytes / 1024, 2) | |
return kb | |
else: | |
return file_bytes | |
def download_progress_callback_main(key, downloaded, file_size, file_size_u, file_unit, download_start_tm): | |
downloaded_u = get_file_size_per_unit(downloaded, file_unit) | |
done = int((50 * downloaded) / file_size) | |
print((' {0} : [{1}>{2}] | {3}{4} / {5}{6} | {7}sec' + (' ' * 5)).format( | |
key, | |
'=' * done, | |
' ' * (50-done), | |
downloaded_u, | |
file_unit, | |
file_size_u, | |
file_unit, | |
int(time.time() - download_start_tm), | |
), end='\r') | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--url', required=True) | |
parser.add_argument('--access-key', required=True) | |
parser.add_argument('--secret-key', required=True) | |
parser.add_argument('-o', '--out-file') | |
args = parser.parse_args() | |
url_parts = urllib.parse.urlsplit(args.url) | |
endpoint_url = urllib.parse.urlunparse((url_parts.scheme, url_parts.netloc, '', '', '', '')) | |
path_parts = url_parts.path.lstrip('/').split('/') | |
bucket_name = path_parts[0] | |
key = '/'.join(path_parts[1:]) | |
out_file = args.out_file | |
if out_file == None: | |
out_file = key.split('/')[-1] | |
try: | |
s3_client = boto3.client('s3', | |
endpoint_url=endpoint_url, | |
aws_access_key_id=args.access_key, | |
aws_secret_access_key=args.secret_key, | |
) | |
head = s3_client.head_object(Bucket=bucket_name, Key=key) | |
file_size = head.get('ContentLength') | |
file_size_u, file_unit = normalize_file_size(file_size) | |
download_start_tm = time.time() | |
downloaded = 0 | |
def download_progress_callback(chunk): | |
nonlocal downloaded | |
downloaded += chunk | |
download_progress_callback_main(key, downloaded, file_size, file_size_u, file_unit, download_start_tm) | |
s3_client.download_file(Bucket=bucket_name, Key=key, Filename=out_file, Callback=download_progress_callback) | |
print('\nSuccessfully downloaded.', f'=> {out_file}') | |
except Exception as ex: | |
print(ex, file=sys.stderr) | |
print('\nFailed to download.', args.url) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment