Last active
January 23, 2025 17:31
-
-
Save fpytloun/155a975e1491d39a2b71647ca923a11a to your computer and use it in GitHub Desktop.
Vector s3sync wrapper for exec source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import logging | |
import os | |
import random | |
import signal | |
import subprocess | |
import sys | |
import time | |
sys.stdout.reconfigure(line_buffering=True) | |
sys.stderr.reconfigure(line_buffering=True) | |
logging.basicConfig( | |
format='%(asctime)s.%(msecs)03d000Z %(levelname)s %(name)s::%(funcName)s: %(message)s', | |
datefmt='%Y-%m-%dT%H:%M:%S') | |
lg = logging.getLogger('s3sync') | |
METRICS = { | |
"total_executions": 0, | |
"successful_executions": 0, | |
"failed_executions": 0, | |
"synced_files": 0, | |
"synced_size": 0, | |
"empty_files": 0, | |
"deleted_files": 0, | |
"execution_time": 0, | |
"sync_time": 0, | |
"cleanup_time": 0, | |
} | |
METRICS_TAGS = {} | |
SHUTDOWN = False | |
SHUTDOWN_SYNC = 0 | |
def sigterm_handler(_signo, _stack_frame): | |
global SHUTDOWN | |
if SHUTDOWN is True: | |
lg.warning("Signal {} received during clean shutdown, terminating immediately".format(_signo)) | |
sys.exit(0) | |
else: | |
lg.warning("Signal {} received, running clean shutdown".format(_signo)) | |
SHUTDOWN = True | |
def parse_args(args=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--region', help="S3 region", required=True) | |
parser.add_argument('-w', '--workers', type=int, help="S3sync workers", | |
default=64) | |
parser.add_argument('-i', '--interval', help="Sync interval", type=int, | |
default=10) | |
parser.add_argument('-m', '--filter-mtime', help="Sync only files not changed for this seconds", type=int, default=60) | |
parser.add_argument('--delete-mtime', help="Delete files not changed for this seconds, 0 = delete immediately, -1 = never delete", type=int, default=600) | |
parser.add_argument('-v', '--verbose', help="Enable verbose logging", | |
action="store_true", required=False) | |
parser.add_argument('-d', '--debug', help="Enable debug logging", | |
action="store_true", required=False) | |
parser.add_argument('source', help="Source directory") | |
parser.add_argument('target', help="Target directory") | |
return parser.parse_args(args) | |
def print_metrics(): | |
print(json.dumps({**METRICS, **METRICS_TAGS})) | |
# Reset counters | |
for k in METRICS.keys(): | |
METRICS[k] = 0 | |
def delete_empty_path(path, basedir=None): | |
if not os.path.isdir(path): | |
# Not a directory, file or nothing, so pick parent in path | |
path = os.path.dirname(path) | |
if path == basedir: | |
lg.debug("Skipping deletion of base directory {}".format(basedir)) | |
return | |
try: | |
os.rmdir(path) | |
lg.debug("Deleted empty directory {}".format(path)) | |
except OSError: | |
# Not empty, don't do anything | |
lg.debug("{} not empty or doesn't exist, skipping deletion".format(path)) | |
return | |
# Removed directory so process parent the same way | |
delete_empty_path(os.path.dirname(path)) | |
def cleanup_path(path, filter_before_mtime): | |
for root, dirs, files in os.walk(path, topdown=False): | |
for name in files: | |
file = os.path.join(root, name) | |
stat = os.stat(file) | |
if stat.st_mtime < filter_before_mtime: | |
os.remove(file) | |
lg.debug("Deleted old file {} (mtime -{} seconds)".format(file, int(time.time() - stat.st_mtime))) | |
METRICS["deleted_files"] += 1 | |
delete_empty_path(file, path) | |
def main(): | |
args = parse_args() | |
if args.verbose: | |
lg.setLevel(logging.INFO) | |
else: | |
lg.setLevel(logging.WARNING) | |
if args.debug: | |
lg.setLevel(logging.DEBUG) | |
global SHUTDOWN_SYNC | |
METRICS_TAGS["source"] = args.source | |
METRICS_TAGS["target"] = args.target | |
# Initial sync synces everything that was changed from before 5 minutes until now | |
t_last_synced = int(time.time()) - 5 * 60 | |
while True: | |
t_start = time.time() | |
if not os.path.isdir(args.source): | |
lg.warning("Source directory {} does not exist, skipping sync".format(args.source)) | |
if SHUTDOWN is True: | |
sys.exit(0) | |
time.sleep(args.interval + random.randrange(0, int(args.interval / 2))) | |
continue | |
if SHUTDOWN is True: | |
# Graceful shutdown, sync everything | |
lg.info("Processing graceful shutdown, ignoring --filter-mtime and syncing everything") | |
filter_mtime = int(time.time()) | |
SHUTDOWN_SYNC += 1 | |
else: | |
filter_mtime = int(time.time()) - args.filter_mtime | |
try: | |
lg.info("Running s3sync from source fs://{}/ to s3://{}/".format(args.source, args.target)) | |
t_sync_start = time.time() | |
s3sync_args = ["--tk", os.getenv("AWS_ACCESS_KEY_ID"), | |
"--ts", os.getenv("AWS_SECRET_ACCESS_KEY"), | |
"--tr", args.region, | |
"-w", str(args.workers), | |
"--filter-before-mtime", str(filter_mtime), | |
"--s3-retry", "3", | |
"--s3-retry-sleep", str(random.randrange(1, 8)), | |
"--fs-disable-xattr", | |
"--sync-log", | |
"--sync-log-format", "json"] | |
if args.delete_mtime != 0: | |
# If we delete immediately, then always sync everything, | |
# otherwise just what was changed since last run | |
s3sync_args.append("--filter-after-mtime", str(t_last_synced)) | |
proc = subprocess.run(["s3sync", *s3sync_args, | |
"fs://{}/".format(args.source), | |
"s3://{}/".format(args.target)], | |
capture_output=True, text=True) | |
METRICS["sync_time"] = int((time.time() - t_sync_start) * 1000) | |
METRICS["total_executions"] += 1 | |
if proc.returncode > 0: | |
lg.error("Process exited with return code {}, some files might fail to sync".format(proc.returncode)) | |
METRICS["failed_executions"] += 1 | |
if args.delete_mtime != 0: | |
# If we delete successfuly synced files, then just re-try | |
# sync failed files on next run. | |
# Otherwise, if we keep files and delete them later, we | |
# don't want to re-upload everything on next run, in such | |
# case, failed files are skipped (unless it's mtime is | |
# updated). | |
t_last_synced = filter_mtime | |
else: | |
METRICS["successful_executions"] += 1 | |
t_last_synced = filter_mtime | |
for line in proc.stderr.splitlines(): | |
try: | |
line = json.loads(line) | |
except Exception: | |
lg.exception("Failed to parse output line: {}".format(line)) | |
continue | |
if line.get("key"): | |
path = os.path.join(args.source, line["key"]) | |
if line["size"] > 10: | |
lg.debug("Synced file {}".format(path)) | |
METRICS["synced_files"] += 1 | |
else: | |
lg.debug("Synced empty file {}".format(path)) | |
METRICS["empty_files"] += 1 | |
if args.delete_mtime == 0: | |
lg.debug("Deleting synced file: {}".format(path)) | |
os.unlink(path) | |
lg.debug("Deleting empty directory structure: {}".format(os.path.dirname(path))) | |
delete_empty_path(os.path.dirname(path), args.target) | |
METRICS["synced_size"] += line["size"] | |
if line.get("level") == "error": | |
lg.error(line['msg']) | |
if args.delete_mtime > 0 and not SHUTDOWN: | |
# Cleanup files that are not modified in past args.delete_mtime seconds | |
lg.info("Running cleanup (mtime -{} minutes)".format(args.delete_mtime / 60)) | |
t_cleanup_start = time.time() | |
cleanup_path(args.source, int(time.time() - args.delete_mtime)) | |
METRICS["cleanup_time"] = int((time.time() - t_cleanup_start) * 1000) | |
METRICS["execution_time"] = int((time.time() - t_start) * 1000) | |
finally: | |
print_metrics() | |
if SHUTDOWN is True and SHUTDOWN_SYNC >= 3: | |
lg.warning("Exitting gracefully after {} runs".format(SHUTDOWN_SYNC)) | |
sys.exit(0) | |
time.sleep(args.interval + random.randrange(0, int(args.interval / 2))) | |
if __name__ == '__main__': | |
signal.signal(signal.SIGTERM, sigterm_handler) | |
signal.signal(signal.SIGINT, sigterm_handler) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To use file sink instead of s3:
To configure s3sync as exec input, parse logs and expose it's metrics: