from argparse import Namespace from collections.abc import Iterable from datetime import datetime, timedelta from hashlib import sha256 from logging import Logger, getLogger from os import getenv from pathlib import Path from re import sub from docker import DockerClient from docker.errors import APIError from pytz import UTC from runner.compose import ( ComposeSpec, Variable, login, logout, normalize_spec, reconcile_schema, stack_deploy, ) logger = getLogger(__name__) def main(settings: Namespace): runner_version = getenv("__DEPLOYMENT_RUNNER_VERSION", "1.0.0") client = DockerClient( base_url=settings.docker_host, version=settings.docker_api_version or "auto", user_agent=f"matchory-deployment-runner/{runner_version} (linux-shell)", ) login(client, settings) compose_spec = normalize_spec(settings) secrets = compose_spec.get("secrets") or {} for name, entry in secrets.items(): secrets[name] = _process_variable(name, entry, settings) configs = compose_spec.get("configs") or {} for name, entry in configs.items(): configs[name] = _process_variable(name, entry, settings) compose_spec = reconcile_schema(compose_spec) stack_deploy(settings, compose_spec) prune_variables(compose_spec, client, settings) logout(client, settings) def _process_variable(name: str, variable: Variable, settings: Namespace) -> Variable: """ Process a variable. :param name: Name of the variable. :param variable: Variable metadata as defined in the compose spec. :param settings: Application settings. :return: Modified variable metadata. """ variant_upper = name.upper() variant_prefix = f"{settings.env_var_prefix}_{variant_upper}" env_variable = ( getenv(variable["environment"]) if "environment" in variable else getenv(variant_upper, getenv(variant_prefix)) ) path = Path(variable["file"]) if "file" in variable else None # Make sure we retain the ability to add secrets and configs from local files, not # just environment variables. If the file specified in the variable exists, we # assume it takes precedence over any environment variable with the same name. if path: if not path.exists(): logger.debug("Expanding %s to %s", name, path) if env_variable is None: raise VariableNotDefinedError(name, (variant_upper, variant_prefix)) path.write_text(env_variable.strip(), encoding="utf-8") else: logger.info( "Skipping secret expansion for %s: Existing file takes precedence " "over variable defined in environment", name, ) else: if not env_variable: raise VariableNotDefinedError(name, (variable["environment"])) # Calculate the hash of the variable file: It will stay the same for subsequent # deployments if the actual value didn't change between them, which means we don't # have to invalidate a secret or config pointlessly. payload_hash = sha256( path.read_bytes() if path else env_variable.encode("utf-8"), ).hexdigest()[:7] if "labels" not in variable: variable["labels"] = {} # By storing these values as labels, we can query for matching variables later on: # This enables us to prune outdated versions automatically. variable["labels"]["com.matchory.service"] = settings.service variable["labels"]["com.matchory.version"] = settings.version variable["labels"]["com.matchory.hash"] = payload_hash # Again, compatibility with legacy deployment runner versions if variable["name"].startswith(settings.service): variable["name"] = sub( pattern=rf"^{settings.service}[_-]", repl="", string=variable["name"], ) if variable["name"].endswith(settings.version): variable["name"] = sub( pattern=rf"[_-]{settings.version}$", repl="", string=variable["name"], ) variable["name"] = f"{settings.service}-{variable['name']}-{payload_hash}" return variable def prune_variables( compose_spec: ComposeSpec, client: DockerClient, settings: Namespace, ): prune_logger = logger.getChild("pruning") _prune_secrets(compose_spec, client, prune_logger, settings) _prune_configs(compose_spec, client, prune_logger, settings) def _prune_secrets( compose_spec: ComposeSpec, client: DockerClient, prune_logger: Logger, settings: Namespace, ): prune_logger.debug("Pruning secrets for service %s", settings.service) spec_secrets = ( [secret["name"] for secret in compose_spec["secrets"].values()] if "secrets" in compose_spec else [] ) secrets = client.secrets.list( filters={ "label": f"com.matchory.service={settings.service}", }, ) if len(secrets) > 0: prune_logger.debug( "Checking %d secret(s) for service %s", len(secrets), settings.service, ) for i, secret in enumerate(secrets): prune_logger.debug( "Checking secret %d/%d: %s", i + 1, len(secrets), secret.name, ) if "com.matchory.hash" not in secret.attrs["Spec"]["Labels"]: prune_logger.warning( "Found invalid secret '%s': Missing hash label", secret.name, ) secret.remove() continue spec_name = sub( pattern=rf"^{settings.service}[_-](.+)[_-].[^_-]+$", repl=r"\1", string=secret.name, ) if secret.name not in spec_secrets: secret_hash = secret.attrs["Spec"]["Labels"]["com.matchory.hash"] prune_logger.debug( "Pruning outdated version %s of secret %s: %s", secret_hash, spec_name, secret.name, ) secret.remove() created_at = parse_date_string(secret.attrs["CreatedAt"]) delta = timedelta(days=30) if created_at < datetime.now(tz=UTC) - delta: prune_logger.warning( "Secret '%s' has been in use for more than 30 days and should " "be rotated!", spec_name, ) secret_names = [secret.name for secret in secrets] legacy_secrets = [ secret for secret in client.secrets.list( filters={ "name": settings.service, }, ) if secret.name not in secret_names ] if len(legacy_secrets) > 0: prune_logger.debug( "Pruning %d legacy secret(s) for service %s", len(legacy_secrets), settings.service, ) for secret in legacy_secrets: prune_logger.info("Pruning legacy secret %s", secret.name) try: secret.remove() except APIError: prune_logger.exception( "Failed to prune legacy secret %s", secret.name, ) def _prune_configs( compose_spec: ComposeSpec, client: DockerClient, prune_logger: Logger, settings: Namespace, ): prune_logger.debug("Pruning configs for service %s", settings.service) spec_configs = ( [config["name"] for config in compose_spec["configs"].values()] if "configs" in compose_spec else [] ) configs = client.configs.list( filters={ "label": f"com.matchory.service={settings.service}", }, ) if len(configs) > 0: prune_logger.debug( "Checking %d config(s) for service %s", len(configs), settings.service, ) for i, config in enumerate(configs): prune_logger.debug( "Checking config %d/%d: %s", i + 1, len(configs), config.name, ) if "com.matchory.hash" not in config.attrs["Spec"]["Labels"]: prune_logger.warning( "Found invalid config '%s': Missing hash label", config.name, ) config.remove() continue spec_name = sub( pattern=rf"^{settings.service}[_-](.+)[_-].[^_-]+$", repl=r"\1", string=config.name, ) if config.name not in spec_configs: config_hash = config.attrs["Spec"]["Labels"]["com.matchory.hash"] prune_logger.debug( "Pruning outdated version %s of config %s: %s", config_hash, spec_name, config.name, ) config.remove() created_at = parse_date_string(config.attrs["CreatedAt"]) delta = timedelta(days=30) if created_at < datetime.now(tz=UTC) - delta: prune_logger.warning( "Secret '%s' has been in use for more than 30 days and should " "be rotated!", spec_name, ) config_names = [config.name for config in configs] legacy_configs = [ config for config in client.configs.list( filters={ "name": settings.service, }, ) if config.name not in config_names ] if len(legacy_configs) > 0: prune_logger.debug( "Pruning %d legacy config(s) for service %s", len(legacy_configs), settings.service, ) for config in legacy_configs: prune_logger.info("Pruning legacy config %s", config.name) try: config.remove() except APIError: prune_logger.exception( "Failed to prune legacy config %s", config.name, ) def parse_date_string(date_string: str) -> datetime: """ Parse a date string provided by Docker Swarm. :param date_string: Date string to parse. :return: Parsed datetime. """ date, _, microseconds = date_string.partition(".") date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S").astimezone(tz=UTC) microseconds = int(microseconds.rstrip("Z"), 10) return date + timedelta(microseconds=microseconds) class VariableNotDefinedError(RuntimeError): variants: Iterable[str] name: str def __init__(self, name: str, variants: Iterable[str]): self.variants = variants self.name = name def __str__(self): """ Retrieve the error message. :return: """ return ( f"Variable '{self.name}' is undefined: {self._variants} in the build " "environment. A deployment variable must be declared in the repository " "variables, deployment environment, or workspace variables. Consult the " "following reference for detailed instructions: " "https://support.atlassian.com/bitbucket-cloud/docs/variables-and-secrets" ) @property def _variants(self): variants = list(self.variants) amount = len(variants) if amount == 1: name = variants[0] return f"'{name}' is not defined" if amount == 2: (first, second) = variants return f"Neither '{first}' nor '{second}' are defined" if amount > 2: names = "', '".join(variants) return f"None of '{names}' are defined" return "No suitable variants are defined"