Created
May 7, 2024 14:29
-
-
Save mohag/eac4eeb5a4e6977df1b5a73375978696 to your computer and use it in GitHub Desktop.
AWS EKS inventory plugin for Ansible
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Copyright (c) 2024 Ansible Project | |
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
# Derived from the aws_rds inventory plugin | |
DOCUMENTATION = r""" | |
name: aws_eks | |
short_description: EKS cluster inventory source | |
description: | |
- Get clusters from Amazon Web Services EKS. | |
- Uses a YAML configuration file that ends with aws_eks.(yml|yaml). | |
options: | |
regions: | |
description: | |
- A list of AWS regions in which to describe EKS clusters. | |
default: [] | |
strict_permissions: | |
description: | |
- By default if an AccessDenied exception is encountered this plugin will fail. You can set strict_permissions to | |
False in the inventory config file which will allow the restrictions to be gracefully skipped. | |
type: bool | |
default: True | |
clusters_to_include: | |
description: | |
- Which clusters to include in the inventory. Set to ['all'] as a shorthand to include everything. (if blank, external clusters are not included) | |
See the "include" option in the U(https://docs.aws.amazon.com/eks/latest/APIReference/API_ListClusters.html#API_ListClusters_RequestSyntax) | |
API for possible values. | |
type: list | |
elements: str | |
default: | |
- all | |
statuses: | |
description: | |
- A list of desired states for instances/clusters to be added to inventory. Set to ['all'] as a shorthand to find everything. | |
See U(https://docs.aws.amazon.com/eks/latest/APIReference/API_Cluster.html#AmazonEKS-Type-Cluster-status) for possible statuses. | |
type: list | |
elements: str | |
default: | |
- all | |
hostvars_prefix: | |
description: | |
- The prefix for host variables names coming from AWS. | |
type: str | |
hostvars_suffix: | |
description: | |
- The suffix for host variables names coming from AWS. | |
type: str | |
notes: | |
- Ansible versions prior to 2.10 should use the fully qualified plugin name 'community.aws.aws_eks'. | |
extends_documentation_fragment: | |
- inventory_cache | |
- constructed | |
- amazon.aws.boto3 | |
- amazon.aws.common.plugins | |
- amazon.aws.region.plugins | |
- amazon.aws.assume_role.plugins | |
author: | |
- Gert van den Berg (@mohag) | |
""" | |
EXAMPLES = r""" | |
plugin: aws_eks | |
regions: | |
- af-south-1 | |
- eu-central-1 | |
- eu-north-1 | |
- us-east-1 | |
keyed_groups: | |
- key: tags | |
prefix: tag | |
- key: region | |
prefix: region | |
- key: status | |
prefix: status | |
hostvars_prefix: aws_ | |
hostvars_suffix: _eks | |
""" | |
try: | |
import botocore | |
except ImportError: | |
pass # will be captured by imported HAS_BOTO3 | |
from ansible.errors import AnsibleError | |
from ansible.module_utils._text import to_native | |
from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict | |
from ansible_collections.amazon.aws.plugins.module_utils.botocore import is_boto3_error_code | |
from ansible_collections.amazon.aws.plugins.plugin_utils.inventory import AWSInventoryBase | |
def _find_clusters_with_valid_statuses(clusters, statuses): | |
if "all" in statuses: | |
return clusters | |
valid_clusters = [] | |
for cluster in clusters: | |
if cluster.get("status") in statuses: | |
valid_clusters.append(cluster) | |
return valid_clusters | |
def describe_resource_with_tags(func): | |
def describe_wrapper(connection, region, include, strict=False): | |
try: | |
results = func(connection=connection, region=region, include=include) | |
except is_boto3_error_code("AccessDenied") as e: # pylint: disable=duplicate-except | |
if not strict: | |
return [] | |
raise AnsibleError(f"Failed to query EKS: {to_native(e)}") | |
except ( | |
botocore.exceptions.BotoCoreError, | |
botocore.exceptions.ClientError, | |
) as e: # pylint: disable=duplicate-except | |
raise AnsibleError(f"Failed to query EKS: {to_native(e)}") | |
return results | |
return describe_wrapper | |
@describe_resource_with_tags | |
def _describe_eks_clusters(connection, region, include): | |
paginator = connection.get_paginator("list_clusters") | |
eks_clusters_list = paginator.paginate(include=include).build_full_result() | |
eks_clusters = [] | |
for cluster in eks_clusters_list["clusters"]: | |
cluster_info = connection.describe_cluster(name=cluster) | |
# Add the region for grouping later (we don't have an easy field to extract it from) | |
cluster_info["cluster"]["region"] = region | |
# TODO: If filters is implemented, they likely need to be checked here. EKS APIs does not have a filter option | |
eks_clusters.append(cluster_info["cluster"]) | |
return eks_clusters | |
class InventoryModule(AWSInventoryBase): | |
NAME = "community.aws.aws_eks" | |
INVENTORY_FILE_SUFFIXES = ("aws_eks.yml", "aws_eks.yaml") | |
def __init__(self): | |
super().__init__() | |
self.credentials = {} | |
def _populate(self, hosts): | |
group = "aws_eks" | |
self.inventory.add_group(group) | |
if hosts: | |
self._add_hosts(hosts=hosts, group=group) | |
self.inventory.add_child("all", group) | |
# This is for caching | |
def _populate_from_source(self, source_data): | |
hostvars = source_data.pop("_meta", {}).get("hostvars", {}) | |
for group in source_data: | |
if group == "all": | |
continue | |
self.inventory.add_group(group) | |
hosts = source_data[group].get("hosts", []) | |
for host in hosts: | |
self._populate_host_vars([host], hostvars.get(host, {}), group) | |
self.inventory.add_child("all", group) | |
def _format_inventory(self, hosts): | |
results = {"_meta": {"hostvars": {}}} | |
group = "aws_eks" | |
results[group] = {"hosts": []} | |
for host in hosts: | |
hostname = host["name"] | |
results[group]["hosts"].append(hostname) | |
h = self.inventory.get_host(hostname) | |
results["_meta"]["hostvars"][h.name] = h.vars | |
return results | |
def _add_hosts(self, hosts, group): | |
""" | |
:param hosts: a list of hosts to be added to a group | |
:param group: the name of the group to which the hosts belong | |
""" | |
for host in hosts: | |
hostname = host["name"] | |
host = camel_dict_to_snake_dict(host, ignore_list=["tags"]) | |
self.inventory.add_host(hostname, group=group) | |
hostvars_prefix = self.get_option("hostvars_prefix") | |
hostvars_suffix = self.get_option("hostvars_suffix") | |
new_vars = dict() | |
for hostvar, hostval in host.items(): | |
if hostvars_prefix: | |
hostvar = hostvars_prefix + hostvar | |
if hostvars_suffix: | |
hostvar = hostvar + hostvars_suffix | |
new_vars[hostvar] = hostval | |
self.inventory.set_variable(hostname, hostvar, hostval) | |
host.update(new_vars) | |
# Use constructed if applicable | |
strict = self.get_option("strict") | |
# Composed variables | |
self._set_composite_vars(self.get_option("compose"), host, hostname, strict=strict) | |
# Complex groups based on jinja2 conditionals, hosts that meet the conditional are added to group | |
self._add_host_to_composed_groups(self.get_option("groups"), host, hostname, strict=strict) | |
# Create groups based on variable values and add the corresponding hosts to it | |
self._add_host_to_keyed_groups(self.get_option("keyed_groups"), host, hostname, strict=strict) | |
def _get_all_eks_clusters(self, strict, statuses, include): | |
""" | |
:param regions: a list of regions in which to describe EKS clusters | |
:param strict: a boolean determining whether to fail or ignore 403 error codes | |
:param statuses: a list of statuses that the returned clusters should match | |
:return A list of cluster dictionaries | |
""" | |
all_clusters = [] | |
for connection, _region in self.all_clients("eks"): | |
all_clusters += _describe_eks_clusters(connection, _region, include=include, strict=strict) | |
sorted_clusters = list( | |
sorted(all_clusters, key=lambda x: x["name"]) | |
) | |
return _find_clusters_with_valid_statuses(sorted_clusters, statuses) | |
def parse(self, inventory, loader, path, cache=True): | |
super().parse(inventory, loader, path, cache=cache) | |
# get user specifications (and defaults) | |
strict_permissions = self.get_option("strict_permissions") | |
statuses = self.get_option("statuses") | |
include = self.get_option("clusters_to_include") | |
result_was_cached, cached_result = self.get_cached_result(path, cache) | |
if result_was_cached: | |
self._populate_from_source(cached_result) | |
return | |
results = self._get_all_eks_clusters( | |
strict_permissions, | |
statuses, | |
include, | |
) | |
self._populate(results) | |
# Update the cache once we're done | |
formatted_inventory = self._format_inventory(results) | |
self.update_cached_result(path, cache, formatted_inventory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment