Skip to content

Instantly share code, notes, and snippets.

@dtrailin
Created July 31, 2024 18:31
Show Gist options
  • Select an option

  • Save dtrailin/99fc6b0235ce577cb226588bceb5c4d9 to your computer and use it in GitHub Desktop.

Select an option

Save dtrailin/99fc6b0235ce577cb226588bceb5c4d9 to your computer and use it in GitHub Desktop.
import csv
import os
import subprocess
import time
from io import StringIO
from typing import List, Dict
from datadog import initialize, statsd
from kubernetes import client, config
SLEEP_TIME_SECONDS = 300 # 5 minutes
def initialize_datadog() -> None:
options = {'statsd_host': os.environ.get('DD_AGENT_HOST')}
initialize(**options)
def parse_node_utilization_percentages(csv_string: str) -> Dict[str, int]:
data = {}
string_io = StringIO(csv_string)
csv_reader = csv.reader(string_io)
next(csv_reader) # Skip header
next(csv_reader) # Skip second line
for row in csv_reader:
data[row[0]] = int(row[3])
return data
def get_nodes_to_defrag(node_usage: Dict[str, int], nodepool: any, node_pack_percentage: int = 90, defrag_ratio: int = 10) -> List[str]:
builder_nodes = []
replica_nodes = []
for node in nodepool.items:
if "pinecone.io/dedicated" in node.metadata.labels and node.metadata.labels["pinecone.io/dedicated"] == "builders":
builder_nodes.append((node.metadata.name, node_usage.get(node.metadata.name, 0)))
else:
replica_nodes.append((node.metadata.name, node_usage.get(node.metadata.name, 0)))
filter_func = lambda x: x[1] < node_pack_percentage
nodes_to_defrag = []
for nodepool in (builder_nodes, replica_nodes):
total_nodes = len(nodepool)
filtered_nodes = list(filter(filter_func, nodepool))
filtered_nodes.sort(key=lambda x: x[1]) # sort by usage
num_nodes_to_defrag = total_nodes // defrag_ratio
nodes_to_defrag += [node[0] for node in filtered_nodes[:num_nodes_to_defrag]]
return nodes_to_defrag
def execute_command(command: List[str]) -> None:
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Error executing command: {' '.join(command)}")
print(e)
def taint_nodes(nodes_to_defrag: List[str]) -> None:
for node in nodes_to_defrag:
print(f"Tainting node {node}")
statsd.gauge('defrag.node_tainted', 1, tags=[f"node:{node}"])
command = ["kubectl", "taint", "nodes", node, "defrag:NoSchedule", "--overwrite"]
execute_command(command)
def restart_replicas(nodes_to_defrag: List[str], core_api: client.CoreV1Api, namespace_exclusion_list: List[str]) -> None:
replicas_to_restart = set()
all_pods = core_api.list_pod_for_all_namespaces(watch=False)
for pod in all_pods.items:
node_name = pod.spec.node_name
if node_name in nodes_to_defrag and "database.pinecone.io/version" in pod.metadata.labels and pod.metadata.namespace not in namespace_exclusion_list:
pod_name = pod.metadata.name
namespace = pod.metadata.namespace
deployment_name = pod_name.split("-")[0] + "-" + pod_name.split("-")[1]
replicas_to_restart.add((deployment_name, namespace))
statsd.gauge('defrag.replicas_to_restart_count', len(replicas_to_restart))
for deployment_name, namespace in replicas_to_restart:
print(f"Restarting deployment {deployment_name} in namespace {namespace}")
statsd.gauge('defrag.deployment_restarted', 1, tags=[f"deployment:{deployment_name}", f"namespace:{namespace}"])
command = ["kubectl", "rollout", "restart", "deployment", deployment_name, "-n", namespace]
execute_command(command)
def untaint_nodes(nodes_to_defrag: List[str]) -> None:
for node in nodes_to_defrag:
print(f"Untainting node {node}")
command = ["kubectl", "taint", "nodes", node, "defrag:NoSchedule-", "--overwrite"]
execute_command(command)
def main() -> None:
initialize_datadog()
print("Starting defragging...")
config.load_incluster_config()
core_api = client.CoreV1Api()
# Get list of nodes
nodes = core_api.list_node()
num_nodes = len(nodes.items)
print(f"Number of nodes: {num_nodes}")
statsd.gauge('defrag.nodes_at_start', num_nodes)
# Load environment variables
namespace_exclusion_list = os.environ.get('NAMESPACE_EXCLUSION_LIST', '').split(',')
defrag_ratio = int(os.environ.get('DEFRAG_RATIO', 10))
node_pack_percentage = int(os.environ.get('NODE_PACK_PERCENTAGE', 90))
# Limit defrag operations to nodes that don't already have taints and get them in sorted order
nodes_to_defrag_process = subprocess.run(['bash', '-c', f"kubectl resource-capacity --no-taint -o csv"], capture_output=True)
if nodes_to_defrag_process.returncode == 0:
node_utilization = parse_node_utilization_percentages(nodes_to_defrag_process.stdout.decode('utf-8'))
nodes = core_api.list_node()
nodes_to_defrag = get_nodes_to_defrag(node_utilization, nodes, node_pack_percentage=node_pack_percentage, defrag_ratio=defrag_ratio)
print(f"Nodes to defrag: {nodes_to_defrag}")
statsd.gauge('defrag.nodes_to_defrag', len(nodes_to_defrag))
taint_nodes(nodes_to_defrag)
restart_replicas(nodes_to_defrag, core_api, namespace_exclusion_list)
# This delay is to allow the replicas to restart before untainting the nodes
print(f"Restarted replicas. Sleeping for {SLEEP_TIME_SECONDS/60} minutes...")
time.sleep(SLEEP_TIME_SECONDS)
untaint_nodes(nodes_to_defrag)
else:
print("Error getting nodes to defrag.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment