Last active
June 12, 2018 23:55
-
-
Save danthedaniel/d788634739390552094873676364da28 to your computer and use it in GitHub Desktop.
Vultr Auto Snapshot (Python 3.6+)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Automatically create a snapshot for a Vultr server, discarding old snapshots | |
to keep only the newest N snapshots. | |
Supports Python 3.6+. | |
""" | |
import logging | |
import argparse | |
import signal | |
import sys | |
from datetime import datetime | |
from typing import Callable, List | |
from functools import partial, wraps | |
from time import sleep | |
from daemonize import Daemonize | |
from vultr import Vultr, VultrError | |
PID_FILE = "/tmp/autosnapshot.pid" | |
LOG_FILE = "/tmp/autosnapshot.log" | |
DEFAULT_DELAY = 24 * 60 * 60 # Once per day | |
# Set up logging | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
logger.propagate = False | |
formatter = logging.Formatter( | |
"[%(levelname)s] %(asctime)s : %(message)s", | |
"%Y-%m-%d %H:%M:%S" | |
) | |
logfile = logging.FileHandler(LOG_FILE, "a") | |
logfile.setFormatter(formatter) | |
logfile.setLevel(logging.DEBUG) | |
logger.addHandler(logfile) | |
stdout = logging.StreamHandler(sys.stdout) | |
stdout.setLevel(logging.DEBUG) | |
logger.addHandler(stdout) | |
logger_fd = logfile.stream.fileno() | |
def retry(err_msg: str, num_tries: int = 3) -> Callable[[Callable], Callable]: | |
"""Retry a function multiple times, logging an error if it fails.""" | |
def _decorator(func: Callable) -> Callable: | |
@wraps(func) | |
def _wrapper(*args, **kwargs): | |
# Try the function `num_tries` times. On Vultr API error apply | |
# exponential delays for re-trying. All other exceptions incur | |
# immediate termination. | |
for x in range(num_tries): | |
try: | |
return func(*args, **kwargs) | |
except VultrError as e: | |
logger.warning(f"VultrError {e} occured in {func.__name__}") | |
sleep(2 ** x) | |
except Exception as e: | |
logger.warning(f"Exception {e} occured in {func.__name__}") | |
break | |
logger.error(err_msg) | |
return _wrapper | |
return _decorator | |
@retry("Could not destroy the snapshot.") | |
def remove_snapshot(vultr: Vultr, snapshot_id: str) -> None: | |
logger.debug(f"Destroying snapshot {snapshot_id}") | |
vultr.snapshot.destroy(snapshot_id) | |
@retry("Could not get the list of snapshots.") | |
def get_snapshots(vultr: Vultr) -> List[dict]: | |
logger.debug(f"Getting list of snapshots") | |
return vultr.snapshot.list().values() | |
@retry("Could not complete snapshot removals.") | |
def remove_old_snapshots(vultr: Vultr, max_to_keep: int) -> None: | |
"""Remove old snapshots so that we only have a max number of snapshots.""" | |
def date_created(snapshot: dict) -> datetime: | |
return datetime.strptime(snapshot["date_created"], "%Y-%m-%d %H:%M:%S") | |
# TODO: There should be a way to filter this by subid | |
snapshots = sorted(get_snapshots(vultr), key=date_created) | |
num_to_remove = max(len(snapshots) - max_to_keep, 0) | |
for snapshot in snapshots[:num_to_remove]: | |
remove_snapshot(vultr, snapshot["SNAPSHOTID"]) | |
@retry("Could not create a snapshot.") | |
def create_snapshot(vultr: Vultr, subid: str) -> None: | |
logger.debug("Creating a new snapshot.") | |
vultr.snapshot.create(subid) | |
def sigterm_handler(_signo, _stack_frame): | |
logger.info("Shutting down. SIGTERM.") | |
sys.exit(0) | |
def main(keep: int, delay: int, continuous: bool, apikey: str, subid: str) -> None: | |
"""Create a new snapshot and remove old ones.""" | |
signal.signal(signal.SIGTERM, sigterm_handler) | |
logger.info("Auto Snapshot started.") | |
vultr = Vultr(apikey) | |
while True: | |
try: | |
create_snapshot(vultr, subid) | |
remove_old_snapshots(vultr, keep) | |
except Exception as e: | |
logger.error(f"Iteration raised an exception {e}.") | |
if not continuous: | |
break | |
sleep(delay) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Automatically make Vultr snapshots.") | |
parser.add_argument("apikey", action="store", type=str, | |
help="Vultr API key.") | |
parser.add_argument("subid", action="store", type=int, | |
help="ID of the server to snapshot.") | |
parser.add_argument("--keep", action="store", type=int, default=5, | |
help="Number of snapshots to keep. Defaults to 5.") | |
parser.add_argument("--delay", action="store", type=int, default=DEFAULT_DELAY, | |
help="Number of seconds between runs. Defaults to 24 hours.") | |
parser.add_argument("-d", dest="daemonize", action="store_true", | |
help="Daemonize the application.") | |
parser.add_argument("-c", dest="continuous", action="store_true", | |
help="Run the application continuously.") | |
args = parser.parse_args() | |
# Apply main's arguments in advanced because Daemonize does not support | |
# passing arguments to its `action` argument. | |
target = partial( | |
main, | |
keep=args.keep, | |
delay=args.delay, | |
continuous=args.continuous, | |
apikey=args.apikey, | |
subid=args.subid | |
) | |
if args.daemonize: | |
daemon = Daemonize( | |
app="autosnapshot", | |
pid=PID_FILE, | |
action=target, | |
keep_fds=[logger_fd] | |
) | |
daemon.start() | |
target() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
vultr==1.0.1 | |
daemonize==2.4.7 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment