Skip to content

Instantly share code, notes, and snippets.

@csghone
Created November 23, 2020 09:47
Show Gist options
  • Save csghone/388b6960a0b120a6870a4154b39e22e8 to your computer and use it in GitHub Desktop.
Save csghone/388b6960a0b120a6870a4154b39e22e8 to your computer and use it in GitHub Desktop.
tag_s3_objects_for_glacier
#!/usr/bin/env python3
# PLEASE USE WITH CAUTION - NOT TESTED COMPLETELY
# Actually tagging is intentionally commented out.
from __future__ import print_function
import os
import sys
import argparse
import traceback
import datetime
import logging
import logging.handlers
import subprocess
import json
import re
import boto3
# Use these two lines in all files
logger = logging.getLogger(__name__)
logger.propagate = True
S3 = boto3.client("s3")
# Call setup_logging() only in file with def main()
# LOG_FORMATTER and def setup_logging() can be moved to a common file for reuse.
LOG_FORMATTER = logging.Formatter(
"%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - " +
"%(lineno)s - %(funcName)s - " +
"%(message)s",
"%Y%m%d %H:%M:%S")
def setup_logging(inp_file, level=logging.INFO, enable_console=True):
file_log_handler = logging.handlers.RotatingFileHandler(
"__" + os.path.basename(inp_file) + ".main__" + ".log",
maxBytes=1000000,
backupCount=5)
console_log_handler = logging.StreamHandler()
root_logger = logging.getLogger()
root_logger.addHandler(file_log_handler)
if enable_console:
root_logger.addHandler(console_log_handler)
root_logger.setLevel(level)
for handler in logger.root.handlers:
handler.setFormatter(fmt=LOG_FORMATTER)
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
def execute_shell(command, err_msg=None, ignore_err=False):
logger.info("Running command: %s", command)
retval = subprocess.call(command, shell=True)
if retval != 0 and not ignore_err:
logger.error("Error: %s : %s", retval, command)
if "timeout" in command and retval == 124:
logger.error("Timeout occured")
if err_msg is not None:
logger.error(err_msg)
return retval
def get_dt_from_key(cur_key):
date_matched = re.match(
".*([0-9]{4}[_,-]?[0-9]{2}[_,-]?[0-9]{2}?).*", cur_key)
if date_matched is None:
return None
cur_key_dt = None
for delimiter in ["_", "-", ""]:
try:
cur_key_dt = datetime.datetime.strptime(
date_matched.group(1),
"%Y{}%m{}%d".format(delimiter, delimiter)
)
break
except:
try:
cur_key_dt = datetime.datetime.strptime(
date_matched.group(1),
"%d{}%m{}%Y".format(delimiter, delimiter)
)
break
except:
continue
return cur_key_dt
def process(**kwargs):
bucket_name = kwargs["bucket"]
base_path = kwargs["path"]
min_size = kwargs["min_size"]
assert min_size > 1000000
if kwargs.get("max_date") is not None:
max_dt = datetime.datetime.strptime(kwargs["max_date"], "%Y%m%d")
max_datestring = max_dt.strftime("%Y%m%d")
else:
max_datestring = None
dry_run = not kwargs.get("disable_dry_run")
pattern_list = kwargs["patterns"]
for key, value in kwargs.items():
logger.info("Setting: %s : %s", key, value)
skipped_files = []
glacier_list = []
no_change_list = []
objs = S3.list_objects_v2(Bucket=bucket_name, Prefix=base_path)
while True:
for obj in objs['Contents']:
cur_key = obj["Key"]
data_for_tagging = {
"Key": obj["Key"],
"VersionId": obj.get("VersionId")
}
cur_key_dt = get_dt_from_key(cur_key)
if cur_key_dt is None:
logger.info("Unable to extract date from key: %s", cur_key)
skipped_files.append(data_for_tagging)
continue
pattern_matched = any([x in cur_key for x in pattern_list])
cur_key_datestring = cur_key_dt.strftime("%Y%m%d")
add_to_glacier = True
glacier_conditions = [
pattern_matched,
cur_key_datestring < max_datestring,
int(obj["Size"]) > min_size
]
if all(glacier_conditions):
glacier_list.append(data_for_tagging)
else:
logger.debug("Skipping path: %s", cur_key)
no_change_list.append(data_for_tagging)
continue
tagset = {
'TagSet': [
{
'Key': 'MY_STORAGE_TYPE',
'Value': 'GLACIER'
}
]
}
fxn_args = {}
fxn_args["Bucket"] = bucket_name
fxn_args["Key"] = obj["Key"]
if obj.get("VersionId") is not None:
fxn_args["VersionId"] = obj["VersionId"]
fxn_args["Tagging"] = tagset
logger.info("Moving to glacier: %s", fxn_args["Key"])
#### if not dry_run:
#### obj = S3.put_object_tagging(**fxn_args)
if objs.get("NextContinuationToken") is None:
break
objs = S3.list_objects_v2(
Bucket=bucket_name,
Prefix=base_path,
ContinuationToken=objs["NextContinuationToken"]
)
json.dump(skipped_files, open("skipped_files.json", mode="w+"), indent=True)
json.dump(glacier_list, open("glacier_list.json", mode="w+"), indent=True)
json.dump(no_change_list, open("no_change_list.json", mode="w+"), indent=True)
def main():
parser = argparse.ArgumentParser(description="Tag S3 Files")
parser.add_argument(
"-b",
"--bucket",
dest="bucket",
help="Target bucket",
type=str,
required=True
)
parser.add_argument(
"-p",
"--path",
dest="path",
help="Base path",
type=str,
required=True
)
parser.add_argument(
"-d",
"--max_date",
dest="max_date",
help="Input date in %%Y%%m%%d format",
type=str,
default=None
)
parser.add_argument(
"-s",
"--min_size",
dest="min_size",
help="Size threshold for glacier",
type=int,
default=100000
)
parser.add_argument(
"--patterns",
dest="patterns",
nargs="+",
help="Patterns to search in key - matching keys will be moved to glacier",
type=str,
default=[]
)
parser.add_argument(
"--disable_dry_run",
dest="disable_dry_run",
action="store_true",
help="No action taken by default, add this to actually enable tagging",
default=False
)
myargs = parser.parse_args()
return process(**vars(myargs))
if __name__ == "__main__":
setup_logging(__file__, level=logging.INFO)
try:
sys.exit(main()) # Ensure return value is passed to shell
except Exception as error: # pylint: disable=W0702, W0703
exc_mesg = traceback.format_exc()
logger.error("\n%s", exc_mesg)
logger.error("Error: %s", error)
sys.exit(-1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment