Created
October 17, 2024 08:22
-
-
Save 123BLiN/36b926a5550cd711a480981926d2b7eb to your computer and use it in GitHub Desktop.
Example kubecost report with Idle cost distribution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
from datetime import datetime, timedelta | |
from dateutil.parser import parse | |
import requests | |
import pandas as pd | |
import boto3 | |
from urllib.parse import urljoin | |
import pytz | |
import json | |
from typing import List, Dict | |
# Environment variables with default values for local testing | |
DESTINATION_S3_BUCKET = os.getenv("DESTINATION_S3_BUCKET", None) | |
DESTINATION_S3_BUCKET_PREFIX = os.getenv("DESTINATION_S3_BUCKET_PREFIX", "kubecost") | |
WINDOW = os.getenv("WINDOW", 1) | |
STEP = os.getenv("STEP", "1h") | |
# Number of splits to divide the window into | |
# This is useful when the window is too large to be processed in a single request | |
SPLITS = int(os.getenv("SPLITS", 4)) | |
ENVIRONMENT = os.getenv("ENVIRONMENT", "local") | |
REGION = os.getenv("REGION", "us-east-1") | |
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID", "000000000") | |
KUBECOST_ENDPOINT = os.getenv("KUBECOST_ENDPOINT", "http://localhost:9090") | |
OUTPUT_FORMAT = os.getenv("OUTPUT_FORMAT", "parquet") | |
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") | |
# Basic input validation | |
def validate_parameters(): | |
try: | |
window_value = int(WINDOW) | |
if window_value <= 0: | |
raise ValueError("WINDOW should be a positive integer.") | |
except ValueError: | |
raise ValueError("WINDOW should be an integer.") | |
if STEP not in ["1h", "1d"]: | |
raise ValueError("STEP should be either '1h' or '1d'.") | |
if OUTPUT_FORMAT not in ["json", "parquet"]: | |
raise ValueError("OUTPUT_FORMAT should be either 'json' or 'parquet'.") | |
log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] | |
if LOG_LEVEL not in log_levels: | |
raise ValueError(f"LOG_LEVEL should be one of: {', '.join(log_levels)}.") | |
logging.basicConfig( | |
level=LOG_LEVEL, | |
format="%(asctime)s :: %(levelname)s :: %(filename)s :: %(funcName)s :: %(message)s", | |
) | |
logger = logging.getLogger(__name__) | |
import sys # Ensure sys is imported at the top of your script | |
def split_window(start_date: datetime, end_date: datetime, splits: int) -> List[tuple]: | |
""" | |
Splits the given window into smaller intervals. We need this because the Kubecost API limitations. | |
Args: | |
start_date (datetime): The start datetime of the window. | |
end_date (datetime): The end datetime of the window. | |
splits (int): The number of intervals to split the window into. | |
Returns: | |
List[tuple]: A list of tuples, each containing the start and end datetime of a smaller interval. | |
""" | |
total_duration = (end_date - start_date).total_seconds() | |
split_duration = total_duration / splits | |
# Generate the start and end times for each split | |
intervals = [] | |
for i in range(splits): | |
split_start = start_date + timedelta(seconds=split_duration * i) | |
split_end = start_date + timedelta(seconds=split_duration * (i + 1)) | |
intervals.append((split_start, split_end)) | |
return intervals | |
def fetch_data( | |
endpoint: str, | |
window: str, | |
step: str, | |
splits: int, | |
model_type: str = "assets", | |
timeout: int = 300, | |
) -> List[Dict[str, str]]: | |
start_date_str, end_date_str = window.split(",") | |
start_date = parse(start_date_str) | |
end_date = parse(end_date_str) | |
if splits < 1: | |
raise ValueError("Number of splits must be at least 1.") | |
windows = split_window(start_date, end_date, splits) | |
aggregated_data = [] | |
for start, end in windows: | |
window_str = f"{start.strftime('%Y-%m-%dT%H:%M:%SZ')},{end.strftime('%Y-%m-%dT%H:%M:%SZ')}" | |
api_url = urljoin(endpoint, f"/model/{model_type}") | |
try: | |
params = {"window": window_str, "accumulate": "false", "step": step} | |
if model_type == "allocation": | |
params.update( | |
{ | |
"idleByNode": "true", | |
"splitIdle": "true", | |
"shareSplit": "weighted", | |
} | |
) | |
full_url = ( | |
f"{api_url}?{requests.utils.unquote(requests.compat.urlencode(params))}" | |
) | |
logging.info(f"Sending GET request to: {full_url}") | |
response = requests.get( | |
api_url, params=params, verify=False, timeout=timeout | |
) | |
response.raise_for_status() | |
response_data = response.json() | |
# Check for warnings in the response | |
if "warning" in response_data: | |
warning_message = response_data["warning"] | |
logging.error(f"API Warning: {warning_message}") | |
sys.exit(-1) | |
data = response_data.get("data", []) | |
if isinstance(data, list): | |
aggregated_data.extend(data) | |
else: | |
logging.warning(f"Data from window {window_str} is not a list.") | |
except Exception as e: | |
logging.error( | |
f"Failed to fetch the data from {api_url} with error: " + str(e) | |
) | |
sys.exit(-1) | |
return aggregated_data | |
def log_aws_identity() -> None: | |
""" | |
Print AWS STS GetCallerIdentity information. | |
This function retrieves the AWS STS GetCallerIdentity information for the current session | |
and logs the account, ARN, and user information using the logging module. | |
Raises: | |
NoCredentialsError: If AWS credentials are not available or invalid. | |
""" | |
try: | |
# commented lines for local computer usage | |
# session = boto3.Session(profile_name='dev_env') | |
# client = session.client('sts') | |
# response = client.get_caller_identity() | |
sts_client = boto3.client("sts") | |
response = sts_client.get_caller_identity() | |
logging.info("AWS STS GetCallerIdentity:") | |
logging.info(f"Account: {response['Account']}") | |
logging.info(f"ARN: {response['Arn']}") | |
logging.info(f"User: {response['UserId']}") | |
except: | |
logging.error("AWS credentials not available") | |
def add_utility_columns(df): | |
# extending data with usefull columns. | |
df["env"] = ENVIRONMENT | |
df["region"] = REGION | |
df["aws_account"] = AWS_ACCOUNT_ID | |
df["kubecost_endpoint"] = KUBECOST_ENDPOINT | |
return df | |
def convert_fields_to_str(df): | |
# convert json fields to str for Firebolt readability | |
for column in ["properties", "window", "pvs", "rawAllocationOnly"]: | |
df[column] = df[column].apply(lambda x: json.dumps(x)) | |
return df | |
def drop_broken_columns(df): | |
if "proportionalAssetResourceCosts" in df: | |
df = df.drop(columns=["proportionalAssetResourceCosts"]) | |
if "sharedCostBreakdown" in df: | |
df = df.drop(columns=["sharedCostBreakdown"]) | |
return df | |
def filter_node_idle_rows(df): | |
# Filter only node Idle costs entries, a few examples: | |
# "name":"eks_multitenant\/ip-10-20-154-228.eu-west-1.compute.internal\/__idle__", | |
# "name":"eks_multitenant\/ip-10-20-154-228.eu-west-1.compute.internal\/kube-system\/ebs-csi-node-262jx\/liveness-probe", | |
# "name":"eks_multitenant\/pvc-25e04c99-495d-4a45-bf45-560d23954117\/__idle__", | |
# "name":"eks_multitenant\/ip-10-10-13-70.ec2.internal\/__idle__" | |
regex_pattern = r"eks_multitenant.*(?:compute.internal|ec2.internal).*__idle__" | |
mask = df["name"].str.contains(regex_pattern) | |
filtered_df = df[mask].copy() | |
return filtered_df | |
def distribute_idle_costs(df_step: pd.DataFrame, step_duration: int) -> pd.DataFrame: | |
""" | |
Distribute the idle costs on a per-node basis across workloads running on that node, excluding daemonsets. | |
Args: | |
df_step (pd.DataFrame): A DataFrame with an hourly/daily breakdown of costs. | |
Returns: | |
pd.DataFrame: The modified DataFrame with distributed idle costs. | |
""" | |
df_step["distributed_idle_cost"] = 0.0 | |
workloads = df_step[ | |
# Exclude rows without node property, mostly "__unmounted__" | |
( | |
df_step["properties"].apply( | |
lambda x: "node" in x if isinstance(x, dict) else False | |
) | |
) | |
& | |
# Exclude daemonsets | |
( | |
df_step["properties"].apply( | |
lambda x: x.get("controllerKind") if isinstance(x, dict) else None | |
) | |
!= "daemonset" | |
) | |
& | |
# Exclude per node __idle__ rows | |
(~df_step["name"].str.contains("/__idle__")) | |
] | |
df_idle = filter_node_idle_rows(df_step) | |
for _, idle_row in df_idle.iterrows(): | |
node = idle_row["properties"].get("node", None) | |
if not node: | |
logging.warning(f"{idle_row['name']} doesn't have a `node` property") | |
continue | |
# Filter workloads that run on the current node | |
node_workloads = workloads[ | |
workloads["properties"].apply( | |
lambda x: x.get("node") if isinstance(x, dict) else None | |
) | |
== node | |
].copy() | |
if node_workloads.empty: | |
logging.debug(f"No workloads found for Node: {node}") | |
# Calculate allocation for CPU and memory based on max(request, usage) | |
# https://docs.kubecost.com/using-kubecost/navigating-the-kubecost-ui/cost-allocation/efficiency-idle#idle | |
node_workloads["cpu_allocation"] = node_workloads[ | |
["cpuCoreRequestAverage", "cpuCoreUsageAverage"] | |
].max(axis=1) | |
node_workloads["memory_allocation"] = node_workloads[ | |
["ramByteRequestAverage", "ramByteUsageAverage"] | |
].max(axis=1) | |
# Calculate the impact for CPU and memory separately | |
cpu_total_cost_impact = node_workloads["cpu_allocation"].sum() | |
memory_total_cost_impact = node_workloads["memory_allocation"].sum() | |
# Get the max impact between CPU and memory for each workload | |
node_workloads["cpu_impact"] = ( | |
node_workloads["cpu_allocation"] / cpu_total_cost_impact | |
if cpu_total_cost_impact != 0 | |
else 0 | |
) | |
node_workloads["memory_impact"] = ( | |
node_workloads["memory_allocation"] / memory_total_cost_impact | |
if memory_total_cost_impact != 0 | |
else 0 | |
) | |
# Total cost impact is the max between CPU impact and memory impact | |
node_workloads["cost_impact"] = node_workloads[ | |
["cpu_impact", "memory_impact"] | |
].max(axis=1) | |
# Adjust cost impact by workload run duration | |
node_workloads["adjusted_cost_impact"] = node_workloads["cost_impact"] * ( | |
node_workloads["minutes"] / step_duration | |
) | |
# Normalize the adjusted cost impacts to ensure they sum to 1 | |
total_adjusted_impact = node_workloads["adjusted_cost_impact"].sum() | |
node_workloads["normalized_cost_impact"] = ( | |
node_workloads["adjusted_cost_impact"] / total_adjusted_impact | |
if total_adjusted_impact != 0 | |
else 0 | |
) | |
# Distribute idle costs among node workloads based on normalized impact | |
node_workloads["distributed_idle_cost"] = ( | |
node_workloads["normalized_cost_impact"] * idle_row["totalCost"] | |
) | |
for nw_index, nw_row in node_workloads.iterrows(): | |
df_step.at[nw_index, "distributed_idle_cost"] = nw_row[ | |
"distributed_idle_cost" | |
] | |
node_idle_cost = idle_row["totalCost"] | |
sum_distributed_costs = node_workloads["distributed_idle_cost"].sum() | |
diff = round(node_idle_cost - sum_distributed_costs, 5) | |
# Log debug info if the difference is greater than 0 | |
if diff > 0: | |
logging.debug(f"Node: {node}") | |
logging.debug(f"Node Idle cost: {node_idle_cost}") | |
logging.debug(f"Sum of distributed costs: {sum_distributed_costs}") | |
logging.debug(f"Difference: {diff}") | |
return df_step | |
def enrich_from_assets(df_step, df_assets_step): | |
""" | |
Enriches provided allocations dataset with the node labels collected from assets. | |
Args: | |
df_step, df_assets_step (pd.DataFrame): A DataFrame with an hourly/daily breakdown of costs. | |
Returns: | |
pd.DataFrame: The modified DataFrame with labels added to node Idle entities. | |
""" | |
# Predefined Karpenter labels we're interested in | |
selected_karpenter_labels = [ | |
"label_dedicated", | |
"label_karpenter_sh_capacity_type", # spot/on-demand | |
"label_karpenter_sh_provisioner_name", | |
] | |
df_idle = filter_node_idle_rows(df_step) | |
# Filter assets_df for corresponding node and type "Node" | |
nodes = df_assets_step[df_assets_step["type"] == "Node"] | |
node_data_dict = {} | |
for _, node_row in nodes.iterrows(): | |
node_data_name = node_row["properties"].get("name") | |
node_data_dict[node_data_name] = node_row | |
for index, row in df_idle.iterrows(): | |
# Example: "name":"eks_multitenant/ip-10-10-13-231.ec2.internal/__idle__" | |
node_name = row["name"].split("/")[1] | |
node_data = node_data_dict.get(node_name, None) | |
if not node_data is None: | |
node_labels = node_data["labels"] | |
if node_labels: # Check if node_labels is not None | |
karpenter_data = { | |
k: node_labels[k] | |
for k in selected_karpenter_labels | |
if k in node_labels | |
} | |
df_step.loc[index, "properties"].update(karpenter_data) | |
else: | |
logging.warning(f"Warning: Labels for Node {node_name} are None.") | |
else: | |
logging.warning( | |
f"Warning: No corresponding Node found in assets for Idle record {node_name}." | |
) | |
return df_step | |
def save_to_s3(df, output_filename, ct): | |
# Save data based on the desired output format | |
if OUTPUT_FORMAT == "parquet": | |
df.to_parquet(output_filename, compression="gzip") | |
elif OUTPUT_FORMAT == "json": | |
df.to_json(output_filename, orient="records", date_format="iso", indent=4) | |
else: | |
logging.error(f"Unsupported output format: {OUTPUT_FORMAT}") | |
sys.exit(1) | |
if DESTINATION_S3_BUCKET: | |
# upload result file onto s3 | |
s3 = boto3.client("s3", region_name=REGION) | |
key = f"{DESTINATION_S3_BUCKET_PREFIX}/year={ct.year}/month={ct.month}/day={ct.day}/region={REGION}/{output_filename}" | |
logging.info( | |
f"Uploading file {output_filename} into s3://{DESTINATION_S3_BUCKET}/{key}" | |
) | |
try: | |
s3.upload_file(output_filename, DESTINATION_S3_BUCKET, key) | |
except Exception as e: | |
logging.error( | |
f"Failed to upload the file into s3://{DESTINATION_S3_BUCKET}/{key} with error: " | |
+ str(e) | |
) | |
sys.exit(1) | |
finally: | |
pass | |
os.remove(output_filename) | |
else: | |
logging.warning("S3 bucket was not provided, creating report locally") | |
def store_data_by_window(days_from_today, step): | |
window, by_day = get_24_hour_window(days_from_today) | |
df_day = pd.DataFrame() | |
assets_df_day = pd.DataFrame() | |
ingestion_time = datetime.now(pytz.utc) # ingestion time | |
# Get allocations | |
data = fetch_data( | |
KUBECOST_ENDPOINT, window, step=step, splits=SPLITS, model_type="allocation" | |
) | |
# Get assets | |
assets_data = fetch_data( | |
KUBECOST_ENDPOINT, window, step=step, splits=SPLITS, model_type="assets" | |
) | |
# Number of chunks, depends on the step and the window. | |
chunks_len = len(data) | |
# Convert step into total minutes for idle costs distribution | |
if step == "1h": | |
step_duration = 60 | |
elif step == "1d": | |
step_duration = 1440 | |
for chunk in range(chunks_len): | |
log_prefix = f"window:{window}, {chunk+1}/{len(data)}" | |
logging.info(f"{log_prefix}. Creating dataframe from response['data'] field") | |
data_per_chunk = data[chunk] | |
assets_data_per_chunk = assets_data[chunk] | |
if not data_per_chunk: | |
logging.warning( | |
f"{log_prefix}.Skipping chunk {chunk}/{chunks_len}, because it's empty" | |
) | |
continue | |
if not assets_data_per_chunk: | |
logging.warning( | |
f"{log_prefix}.Assets are missing for chunk {chunk}/{chunks_len}" | |
) | |
df_step = pd.DataFrame(list(data_per_chunk.values())) | |
df_assets_chunk = pd.DataFrame(list(assets_data_per_chunk.values())) | |
logging.info(f"{log_prefix}. Enriching data from assets") | |
df_step = enrich_from_assets(df_step, df_assets_chunk) | |
logging.info( | |
f"{log_prefix}. Distributing idle costs across non-idle allocations" | |
) | |
df_step = distribute_idle_costs(df_step, step_duration) | |
logging.info( | |
f"{log_prefix}. Converting json feilds into str, for readability" | |
) | |
df_step = convert_fields_to_str(df_step) | |
logging.info( | |
f"{log_prefix}. Adding metedata {ingestion_time}, {ENVIRONMENT}, {REGION}, {AWS_ACCOUNT_ID}, {KUBECOST_ENDPOINT}" | |
) | |
df_step["ingestion_ts"] = ingestion_time | |
logging.info(f"{log_prefix}. Adding utility columns") | |
df_step = add_utility_columns(df_step) | |
logging.info(f"{log_prefix}. Dropping broken columns") | |
df_step = drop_broken_columns(df_step) | |
df_day = pd.concat([df_day, df_step]).reset_index(drop=True) | |
assets_df_day = pd.concat([assets_df_day, df_assets_chunk]).reset_index( | |
drop=True | |
) | |
file_extension = OUTPUT_FORMAT | |
output_filename = f"kubecost_{ENVIRONMENT}_{AWS_ACCOUNT_ID}_{by_day.strftime('%Y%m%d')}.{file_extension}" | |
logging.info(f"Saving data for window: {window} to {output_filename}") | |
save_to_s3(df_day, output_filename, by_day) | |
# Total Idle Cost | |
mask = df_day["name"].str.contains("__idle__") | |
total_idle_cost = df_day[mask]["totalCost"].sum() | |
# Total Cost (Including Idle) | |
total_cost = df_day["totalCost"].sum() | |
# Total distributed Idle Cost | |
total_distributed_idle_cost = df_day["distributed_idle_cost"].sum() | |
# Total Cost excluding Idle | |
non_idle_cost = total_cost - total_idle_cost | |
# Total Cost of assets for the same period | |
assets_total_cost = assets_df_day["totalCost"].sum() | |
# Provisioner assigned costs | |
mask = (df_day["name"].str.contains("__idle__")) & ( | |
df_day["properties"].str.contains("label_karpenter_sh_provisioner_name") | |
) | |
total_provisioner_idle_cost = df_day[mask]["totalCost"].sum() | |
logging.info("Total Idle Cost: %s", total_idle_cost) | |
logging.info("Total distributed Idle Cost: %s", total_distributed_idle_cost) | |
logging.info( | |
"Total provisioner assigned Idle Cost: %s", total_provisioner_idle_cost | |
) | |
logging.info("Total Cost: %s", total_cost) | |
logging.info("Total Non-Idle Cost: %s", non_idle_cost) | |
logging.info("Total Assets Cost: %s", assets_total_cost) | |
def get_24_hour_window(days): | |
""" | |
Creates a 24 hour window timeframe starting at today-${days}. | |
Args: | |
days (int): Which day since today to create window for. | |
Returns: | |
window (str): Window in a format that kubecost expects. | |
start_data (datetime): The start date. | |
""" | |
date_today = datetime.now(pytz.utc) | |
start_date = date_today - timedelta(days=days) | |
end_date = date_today - timedelta(days=days - 1) | |
start_date_fmt = start_date.strftime("%Y-%m-%dT00:00:00Z") | |
end_date_fmt = end_date.strftime("%Y-%m-%dT00:00:00Z") | |
window = f"{start_date_fmt},{end_date_fmt}" | |
return window, start_date | |
def main(): | |
# Validate input | |
validate_parameters() | |
if DESTINATION_S3_BUCKET: | |
log_aws_identity() | |
store_data_by_window(int(WINDOW), STEP) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
boto3==1.26.146 | |
botocore==1.29.146 | |
certifi==2023.7.22 | |
charset-normalizer==3.1.0 | |
idna==3.7 | |
jmespath==1.0.1 | |
numpy==1.24.3 | |
pandas==2.0.2 | |
pyarrow==14.0.1 | |
python-dateutil==2.8.2 | |
pytz==2023.3 | |
requests==2.32.0 | |
s3transfer==0.6.1 | |
six==1.16.0 | |
tzdata==2023.3 | |
urllib3==1.26.19 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment