Skip to content

Instantly share code, notes, and snippets.

@danthedaniel
Last active June 12, 2018 23:55
Show Gist options
  • Save danthedaniel/d788634739390552094873676364da28 to your computer and use it in GitHub Desktop.
Save danthedaniel/d788634739390552094873676364da28 to your computer and use it in GitHub Desktop.
Vultr Auto Snapshot (Python 3.6+)
#!/usr/bin/env python3
"""
Automatically create a snapshot for a Vultr server, discarding old snapshots
to keep only the newest N snapshots.
Supports Python 3.6+.
"""
import logging
import argparse
import signal
import sys
from datetime import datetime
from typing import Callable, List
from functools import partial, wraps
from time import sleep
from daemonize import Daemonize
from vultr import Vultr, VultrError
PID_FILE = "/tmp/autosnapshot.pid"
LOG_FILE = "/tmp/autosnapshot.log"
DEFAULT_DELAY = 24 * 60 * 60 # Once per day
# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
formatter = logging.Formatter(
"[%(levelname)s] %(asctime)s : %(message)s",
"%Y-%m-%d %H:%M:%S"
)
logfile = logging.FileHandler(LOG_FILE, "a")
logfile.setFormatter(formatter)
logfile.setLevel(logging.DEBUG)
logger.addHandler(logfile)
stdout = logging.StreamHandler(sys.stdout)
stdout.setLevel(logging.DEBUG)
logger.addHandler(stdout)
logger_fd = logfile.stream.fileno()
def retry(err_msg: str, num_tries: int = 3) -> Callable[[Callable], Callable]:
"""Retry a function multiple times, logging an error if it fails."""
def _decorator(func: Callable) -> Callable:
@wraps(func)
def _wrapper(*args, **kwargs):
# Try the function `num_tries` times. On Vultr API error apply
# exponential delays for re-trying. All other exceptions incur
# immediate termination.
for x in range(num_tries):
try:
return func(*args, **kwargs)
except VultrError as e:
logger.warning(f"VultrError {e} occured in {func.__name__}")
sleep(2 ** x)
except Exception as e:
logger.warning(f"Exception {e} occured in {func.__name__}")
break
logger.error(err_msg)
return _wrapper
return _decorator
@retry("Could not destroy the snapshot.")
def remove_snapshot(vultr: Vultr, snapshot_id: str) -> None:
logger.debug(f"Destroying snapshot {snapshot_id}")
vultr.snapshot.destroy(snapshot_id)
@retry("Could not get the list of snapshots.")
def get_snapshots(vultr: Vultr) -> List[dict]:
logger.debug(f"Getting list of snapshots")
return vultr.snapshot.list().values()
@retry("Could not complete snapshot removals.")
def remove_old_snapshots(vultr: Vultr, max_to_keep: int) -> None:
"""Remove old snapshots so that we only have a max number of snapshots."""
def date_created(snapshot: dict) -> datetime:
return datetime.strptime(snapshot["date_created"], "%Y-%m-%d %H:%M:%S")
# TODO: There should be a way to filter this by subid
snapshots = sorted(get_snapshots(vultr), key=date_created)
num_to_remove = max(len(snapshots) - max_to_keep, 0)
for snapshot in snapshots[:num_to_remove]:
remove_snapshot(vultr, snapshot["SNAPSHOTID"])
@retry("Could not create a snapshot.")
def create_snapshot(vultr: Vultr, subid: str) -> None:
logger.debug("Creating a new snapshot.")
vultr.snapshot.create(subid)
def sigterm_handler(_signo, _stack_frame):
logger.info("Shutting down. SIGTERM.")
sys.exit(0)
def main(keep: int, delay: int, continuous: bool, apikey: str, subid: str) -> None:
"""Create a new snapshot and remove old ones."""
signal.signal(signal.SIGTERM, sigterm_handler)
logger.info("Auto Snapshot started.")
vultr = Vultr(apikey)
while True:
try:
create_snapshot(vultr, subid)
remove_old_snapshots(vultr, keep)
except Exception as e:
logger.error(f"Iteration raised an exception {e}.")
if not continuous:
break
sleep(delay)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Automatically make Vultr snapshots.")
parser.add_argument("apikey", action="store", type=str,
help="Vultr API key.")
parser.add_argument("subid", action="store", type=int,
help="ID of the server to snapshot.")
parser.add_argument("--keep", action="store", type=int, default=5,
help="Number of snapshots to keep. Defaults to 5.")
parser.add_argument("--delay", action="store", type=int, default=DEFAULT_DELAY,
help="Number of seconds between runs. Defaults to 24 hours.")
parser.add_argument("-d", dest="daemonize", action="store_true",
help="Daemonize the application.")
parser.add_argument("-c", dest="continuous", action="store_true",
help="Run the application continuously.")
args = parser.parse_args()
# Apply main's arguments in advanced because Daemonize does not support
# passing arguments to its `action` argument.
target = partial(
main,
keep=args.keep,
delay=args.delay,
continuous=args.continuous,
apikey=args.apikey,
subid=args.subid
)
if args.daemonize:
daemon = Daemonize(
app="autosnapshot",
pid=PID_FILE,
action=target,
keep_fds=[logger_fd]
)
daemon.start()
target()
vultr==1.0.1
daemonize==2.4.7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment