Created
July 31, 2024 18:31
-
-
Save dtrailin/99fc6b0235ce577cb226588bceb5c4d9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| import os | |
| import subprocess | |
| import time | |
| from io import StringIO | |
| from typing import List, Dict | |
| from datadog import initialize, statsd | |
| from kubernetes import client, config | |
| SLEEP_TIME_SECONDS = 300 # 5 minutes | |
| def initialize_datadog() -> None: | |
| options = {'statsd_host': os.environ.get('DD_AGENT_HOST')} | |
| initialize(**options) | |
| def parse_node_utilization_percentages(csv_string: str) -> Dict[str, int]: | |
| data = {} | |
| string_io = StringIO(csv_string) | |
| csv_reader = csv.reader(string_io) | |
| next(csv_reader) # Skip header | |
| next(csv_reader) # Skip second line | |
| for row in csv_reader: | |
| data[row[0]] = int(row[3]) | |
| return data | |
| def get_nodes_to_defrag(node_usage: Dict[str, int], nodepool: any, node_pack_percentage: int = 90, defrag_ratio: int = 10) -> List[str]: | |
| builder_nodes = [] | |
| replica_nodes = [] | |
| for node in nodepool.items: | |
| if "pinecone.io/dedicated" in node.metadata.labels and node.metadata.labels["pinecone.io/dedicated"] == "builders": | |
| builder_nodes.append((node.metadata.name, node_usage.get(node.metadata.name, 0))) | |
| else: | |
| replica_nodes.append((node.metadata.name, node_usage.get(node.metadata.name, 0))) | |
| filter_func = lambda x: x[1] < node_pack_percentage | |
| nodes_to_defrag = [] | |
| for nodepool in (builder_nodes, replica_nodes): | |
| total_nodes = len(nodepool) | |
| filtered_nodes = list(filter(filter_func, nodepool)) | |
| filtered_nodes.sort(key=lambda x: x[1]) # sort by usage | |
| num_nodes_to_defrag = total_nodes // defrag_ratio | |
| nodes_to_defrag += [node[0] for node in filtered_nodes[:num_nodes_to_defrag]] | |
| return nodes_to_defrag | |
| def execute_command(command: List[str]) -> None: | |
| try: | |
| subprocess.run(command, check=True) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error executing command: {' '.join(command)}") | |
| print(e) | |
| def taint_nodes(nodes_to_defrag: List[str]) -> None: | |
| for node in nodes_to_defrag: | |
| print(f"Tainting node {node}") | |
| statsd.gauge('defrag.node_tainted', 1, tags=[f"node:{node}"]) | |
| command = ["kubectl", "taint", "nodes", node, "defrag:NoSchedule", "--overwrite"] | |
| execute_command(command) | |
| def restart_replicas(nodes_to_defrag: List[str], core_api: client.CoreV1Api, namespace_exclusion_list: List[str]) -> None: | |
| replicas_to_restart = set() | |
| all_pods = core_api.list_pod_for_all_namespaces(watch=False) | |
| for pod in all_pods.items: | |
| node_name = pod.spec.node_name | |
| if node_name in nodes_to_defrag and "database.pinecone.io/version" in pod.metadata.labels and pod.metadata.namespace not in namespace_exclusion_list: | |
| pod_name = pod.metadata.name | |
| namespace = pod.metadata.namespace | |
| deployment_name = pod_name.split("-")[0] + "-" + pod_name.split("-")[1] | |
| replicas_to_restart.add((deployment_name, namespace)) | |
| statsd.gauge('defrag.replicas_to_restart_count', len(replicas_to_restart)) | |
| for deployment_name, namespace in replicas_to_restart: | |
| print(f"Restarting deployment {deployment_name} in namespace {namespace}") | |
| statsd.gauge('defrag.deployment_restarted', 1, tags=[f"deployment:{deployment_name}", f"namespace:{namespace}"]) | |
| command = ["kubectl", "rollout", "restart", "deployment", deployment_name, "-n", namespace] | |
| execute_command(command) | |
| def untaint_nodes(nodes_to_defrag: List[str]) -> None: | |
| for node in nodes_to_defrag: | |
| print(f"Untainting node {node}") | |
| command = ["kubectl", "taint", "nodes", node, "defrag:NoSchedule-", "--overwrite"] | |
| execute_command(command) | |
| def main() -> None: | |
| initialize_datadog() | |
| print("Starting defragging...") | |
| config.load_incluster_config() | |
| core_api = client.CoreV1Api() | |
| # Get list of nodes | |
| nodes = core_api.list_node() | |
| num_nodes = len(nodes.items) | |
| print(f"Number of nodes: {num_nodes}") | |
| statsd.gauge('defrag.nodes_at_start', num_nodes) | |
| # Load environment variables | |
| namespace_exclusion_list = os.environ.get('NAMESPACE_EXCLUSION_LIST', '').split(',') | |
| defrag_ratio = int(os.environ.get('DEFRAG_RATIO', 10)) | |
| node_pack_percentage = int(os.environ.get('NODE_PACK_PERCENTAGE', 90)) | |
| # Limit defrag operations to nodes that don't already have taints and get them in sorted order | |
| nodes_to_defrag_process = subprocess.run(['bash', '-c', f"kubectl resource-capacity --no-taint -o csv"], capture_output=True) | |
| if nodes_to_defrag_process.returncode == 0: | |
| node_utilization = parse_node_utilization_percentages(nodes_to_defrag_process.stdout.decode('utf-8')) | |
| nodes = core_api.list_node() | |
| nodes_to_defrag = get_nodes_to_defrag(node_utilization, nodes, node_pack_percentage=node_pack_percentage, defrag_ratio=defrag_ratio) | |
| print(f"Nodes to defrag: {nodes_to_defrag}") | |
| statsd.gauge('defrag.nodes_to_defrag', len(nodes_to_defrag)) | |
| taint_nodes(nodes_to_defrag) | |
| restart_replicas(nodes_to_defrag, core_api, namespace_exclusion_list) | |
| # This delay is to allow the replicas to restart before untainting the nodes | |
| print(f"Restarted replicas. Sleeping for {SLEEP_TIME_SECONDS/60} minutes...") | |
| time.sleep(SLEEP_TIME_SECONDS) | |
| untaint_nodes(nodes_to_defrag) | |
| else: | |
| print("Error getting nodes to defrag.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment