Created
November 23, 2020 09:47
-
-
Save csghone/388b6960a0b120a6870a4154b39e22e8 to your computer and use it in GitHub Desktop.
tag_s3_objects_for_glacier
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# PLEASE USE WITH CAUTION - NOT TESTED COMPLETELY | |
# Actually tagging is intentionally commented out. | |
from __future__ import print_function | |
import os | |
import sys | |
import argparse | |
import traceback | |
import datetime | |
import logging | |
import logging.handlers | |
import subprocess | |
import json | |
import re | |
import boto3 | |
# Use these two lines in all files | |
logger = logging.getLogger(__name__) | |
logger.propagate = True | |
S3 = boto3.client("s3") | |
# Call setup_logging() only in file with def main() | |
# LOG_FORMATTER and def setup_logging() can be moved to a common file for reuse. | |
LOG_FORMATTER = logging.Formatter( | |
"%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - " + | |
"%(lineno)s - %(funcName)s - " + | |
"%(message)s", | |
"%Y%m%d %H:%M:%S") | |
def setup_logging(inp_file, level=logging.INFO, enable_console=True): | |
file_log_handler = logging.handlers.RotatingFileHandler( | |
"__" + os.path.basename(inp_file) + ".main__" + ".log", | |
maxBytes=1000000, | |
backupCount=5) | |
console_log_handler = logging.StreamHandler() | |
root_logger = logging.getLogger() | |
root_logger.addHandler(file_log_handler) | |
if enable_console: | |
root_logger.addHandler(console_log_handler) | |
root_logger.setLevel(level) | |
for handler in logger.root.handlers: | |
handler.setFormatter(fmt=LOG_FORMATTER) | |
logging.getLogger('boto3').setLevel(logging.WARNING) | |
logging.getLogger('botocore').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
def execute_shell(command, err_msg=None, ignore_err=False): | |
logger.info("Running command: %s", command) | |
retval = subprocess.call(command, shell=True) | |
if retval != 0 and not ignore_err: | |
logger.error("Error: %s : %s", retval, command) | |
if "timeout" in command and retval == 124: | |
logger.error("Timeout occured") | |
if err_msg is not None: | |
logger.error(err_msg) | |
return retval | |
def get_dt_from_key(cur_key): | |
date_matched = re.match( | |
".*([0-9]{4}[_,-]?[0-9]{2}[_,-]?[0-9]{2}?).*", cur_key) | |
if date_matched is None: | |
return None | |
cur_key_dt = None | |
for delimiter in ["_", "-", ""]: | |
try: | |
cur_key_dt = datetime.datetime.strptime( | |
date_matched.group(1), | |
"%Y{}%m{}%d".format(delimiter, delimiter) | |
) | |
break | |
except: | |
try: | |
cur_key_dt = datetime.datetime.strptime( | |
date_matched.group(1), | |
"%d{}%m{}%Y".format(delimiter, delimiter) | |
) | |
break | |
except: | |
continue | |
return cur_key_dt | |
def process(**kwargs): | |
bucket_name = kwargs["bucket"] | |
base_path = kwargs["path"] | |
min_size = kwargs["min_size"] | |
assert min_size > 1000000 | |
if kwargs.get("max_date") is not None: | |
max_dt = datetime.datetime.strptime(kwargs["max_date"], "%Y%m%d") | |
max_datestring = max_dt.strftime("%Y%m%d") | |
else: | |
max_datestring = None | |
dry_run = not kwargs.get("disable_dry_run") | |
pattern_list = kwargs["patterns"] | |
for key, value in kwargs.items(): | |
logger.info("Setting: %s : %s", key, value) | |
skipped_files = [] | |
glacier_list = [] | |
no_change_list = [] | |
objs = S3.list_objects_v2(Bucket=bucket_name, Prefix=base_path) | |
while True: | |
for obj in objs['Contents']: | |
cur_key = obj["Key"] | |
data_for_tagging = { | |
"Key": obj["Key"], | |
"VersionId": obj.get("VersionId") | |
} | |
cur_key_dt = get_dt_from_key(cur_key) | |
if cur_key_dt is None: | |
logger.info("Unable to extract date from key: %s", cur_key) | |
skipped_files.append(data_for_tagging) | |
continue | |
pattern_matched = any([x in cur_key for x in pattern_list]) | |
cur_key_datestring = cur_key_dt.strftime("%Y%m%d") | |
add_to_glacier = True | |
glacier_conditions = [ | |
pattern_matched, | |
cur_key_datestring < max_datestring, | |
int(obj["Size"]) > min_size | |
] | |
if all(glacier_conditions): | |
glacier_list.append(data_for_tagging) | |
else: | |
logger.debug("Skipping path: %s", cur_key) | |
no_change_list.append(data_for_tagging) | |
continue | |
tagset = { | |
'TagSet': [ | |
{ | |
'Key': 'MY_STORAGE_TYPE', | |
'Value': 'GLACIER' | |
} | |
] | |
} | |
fxn_args = {} | |
fxn_args["Bucket"] = bucket_name | |
fxn_args["Key"] = obj["Key"] | |
if obj.get("VersionId") is not None: | |
fxn_args["VersionId"] = obj["VersionId"] | |
fxn_args["Tagging"] = tagset | |
logger.info("Moving to glacier: %s", fxn_args["Key"]) | |
#### if not dry_run: | |
#### obj = S3.put_object_tagging(**fxn_args) | |
if objs.get("NextContinuationToken") is None: | |
break | |
objs = S3.list_objects_v2( | |
Bucket=bucket_name, | |
Prefix=base_path, | |
ContinuationToken=objs["NextContinuationToken"] | |
) | |
json.dump(skipped_files, open("skipped_files.json", mode="w+"), indent=True) | |
json.dump(glacier_list, open("glacier_list.json", mode="w+"), indent=True) | |
json.dump(no_change_list, open("no_change_list.json", mode="w+"), indent=True) | |
def main(): | |
parser = argparse.ArgumentParser(description="Tag S3 Files") | |
parser.add_argument( | |
"-b", | |
"--bucket", | |
dest="bucket", | |
help="Target bucket", | |
type=str, | |
required=True | |
) | |
parser.add_argument( | |
"-p", | |
"--path", | |
dest="path", | |
help="Base path", | |
type=str, | |
required=True | |
) | |
parser.add_argument( | |
"-d", | |
"--max_date", | |
dest="max_date", | |
help="Input date in %%Y%%m%%d format", | |
type=str, | |
default=None | |
) | |
parser.add_argument( | |
"-s", | |
"--min_size", | |
dest="min_size", | |
help="Size threshold for glacier", | |
type=int, | |
default=100000 | |
) | |
parser.add_argument( | |
"--patterns", | |
dest="patterns", | |
nargs="+", | |
help="Patterns to search in key - matching keys will be moved to glacier", | |
type=str, | |
default=[] | |
) | |
parser.add_argument( | |
"--disable_dry_run", | |
dest="disable_dry_run", | |
action="store_true", | |
help="No action taken by default, add this to actually enable tagging", | |
default=False | |
) | |
myargs = parser.parse_args() | |
return process(**vars(myargs)) | |
if __name__ == "__main__": | |
setup_logging(__file__, level=logging.INFO) | |
try: | |
sys.exit(main()) # Ensure return value is passed to shell | |
except Exception as error: # pylint: disable=W0702, W0703 | |
exc_mesg = traceback.format_exc() | |
logger.error("\n%s", exc_mesg) | |
logger.error("Error: %s", error) | |
sys.exit(-1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment