Skip to content

Instantly share code, notes, and snippets.

@123BLiN
Created October 17, 2024 08:22
Show Gist options
  • Save 123BLiN/36b926a5550cd711a480981926d2b7eb to your computer and use it in GitHub Desktop.
Save 123BLiN/36b926a5550cd711a480981926d2b7eb to your computer and use it in GitHub Desktop.
Example kubecost report with Idle cost distribution
import os
import sys
import logging
from datetime import datetime, timedelta
from dateutil.parser import parse
import requests
import pandas as pd
import boto3
from urllib.parse import urljoin
import pytz
import json
from typing import List, Dict
# Environment variables with default values for local testing
DESTINATION_S3_BUCKET = os.getenv("DESTINATION_S3_BUCKET", None)
DESTINATION_S3_BUCKET_PREFIX = os.getenv("DESTINATION_S3_BUCKET_PREFIX", "kubecost")
WINDOW = os.getenv("WINDOW", 1)
STEP = os.getenv("STEP", "1h")
# Number of splits to divide the window into
# This is useful when the window is too large to be processed in a single request
SPLITS = int(os.getenv("SPLITS", 4))
ENVIRONMENT = os.getenv("ENVIRONMENT", "local")
REGION = os.getenv("REGION", "us-east-1")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID", "000000000")
KUBECOST_ENDPOINT = os.getenv("KUBECOST_ENDPOINT", "http://localhost:9090")
OUTPUT_FORMAT = os.getenv("OUTPUT_FORMAT", "parquet")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# Basic input validation
def validate_parameters():
try:
window_value = int(WINDOW)
if window_value <= 0:
raise ValueError("WINDOW should be a positive integer.")
except ValueError:
raise ValueError("WINDOW should be an integer.")
if STEP not in ["1h", "1d"]:
raise ValueError("STEP should be either '1h' or '1d'.")
if OUTPUT_FORMAT not in ["json", "parquet"]:
raise ValueError("OUTPUT_FORMAT should be either 'json' or 'parquet'.")
log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if LOG_LEVEL not in log_levels:
raise ValueError(f"LOG_LEVEL should be one of: {', '.join(log_levels)}.")
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s :: %(levelname)s :: %(filename)s :: %(funcName)s :: %(message)s",
)
logger = logging.getLogger(__name__)
import sys # Ensure sys is imported at the top of your script
def split_window(start_date: datetime, end_date: datetime, splits: int) -> List[tuple]:
"""
Splits the given window into smaller intervals. We need this because the Kubecost API limitations.
Args:
start_date (datetime): The start datetime of the window.
end_date (datetime): The end datetime of the window.
splits (int): The number of intervals to split the window into.
Returns:
List[tuple]: A list of tuples, each containing the start and end datetime of a smaller interval.
"""
total_duration = (end_date - start_date).total_seconds()
split_duration = total_duration / splits
# Generate the start and end times for each split
intervals = []
for i in range(splits):
split_start = start_date + timedelta(seconds=split_duration * i)
split_end = start_date + timedelta(seconds=split_duration * (i + 1))
intervals.append((split_start, split_end))
return intervals
def fetch_data(
endpoint: str,
window: str,
step: str,
splits: int,
model_type: str = "assets",
timeout: int = 300,
) -> List[Dict[str, str]]:
start_date_str, end_date_str = window.split(",")
start_date = parse(start_date_str)
end_date = parse(end_date_str)
if splits < 1:
raise ValueError("Number of splits must be at least 1.")
windows = split_window(start_date, end_date, splits)
aggregated_data = []
for start, end in windows:
window_str = f"{start.strftime('%Y-%m-%dT%H:%M:%SZ')},{end.strftime('%Y-%m-%dT%H:%M:%SZ')}"
api_url = urljoin(endpoint, f"/model/{model_type}")
try:
params = {"window": window_str, "accumulate": "false", "step": step}
if model_type == "allocation":
params.update(
{
"idleByNode": "true",
"splitIdle": "true",
"shareSplit": "weighted",
}
)
full_url = (
f"{api_url}?{requests.utils.unquote(requests.compat.urlencode(params))}"
)
logging.info(f"Sending GET request to: {full_url}")
response = requests.get(
api_url, params=params, verify=False, timeout=timeout
)
response.raise_for_status()
response_data = response.json()
# Check for warnings in the response
if "warning" in response_data:
warning_message = response_data["warning"]
logging.error(f"API Warning: {warning_message}")
sys.exit(-1)
data = response_data.get("data", [])
if isinstance(data, list):
aggregated_data.extend(data)
else:
logging.warning(f"Data from window {window_str} is not a list.")
except Exception as e:
logging.error(
f"Failed to fetch the data from {api_url} with error: " + str(e)
)
sys.exit(-1)
return aggregated_data
def log_aws_identity() -> None:
"""
Print AWS STS GetCallerIdentity information.
This function retrieves the AWS STS GetCallerIdentity information for the current session
and logs the account, ARN, and user information using the logging module.
Raises:
NoCredentialsError: If AWS credentials are not available or invalid.
"""
try:
# commented lines for local computer usage
# session = boto3.Session(profile_name='dev_env')
# client = session.client('sts')
# response = client.get_caller_identity()
sts_client = boto3.client("sts")
response = sts_client.get_caller_identity()
logging.info("AWS STS GetCallerIdentity:")
logging.info(f"Account: {response['Account']}")
logging.info(f"ARN: {response['Arn']}")
logging.info(f"User: {response['UserId']}")
except:
logging.error("AWS credentials not available")
def add_utility_columns(df):
# extending data with usefull columns.
df["env"] = ENVIRONMENT
df["region"] = REGION
df["aws_account"] = AWS_ACCOUNT_ID
df["kubecost_endpoint"] = KUBECOST_ENDPOINT
return df
def convert_fields_to_str(df):
# convert json fields to str for Firebolt readability
for column in ["properties", "window", "pvs", "rawAllocationOnly"]:
df[column] = df[column].apply(lambda x: json.dumps(x))
return df
def drop_broken_columns(df):
if "proportionalAssetResourceCosts" in df:
df = df.drop(columns=["proportionalAssetResourceCosts"])
if "sharedCostBreakdown" in df:
df = df.drop(columns=["sharedCostBreakdown"])
return df
def filter_node_idle_rows(df):
# Filter only node Idle costs entries, a few examples:
# "name":"eks_multitenant\/ip-10-20-154-228.eu-west-1.compute.internal\/__idle__",
# "name":"eks_multitenant\/ip-10-20-154-228.eu-west-1.compute.internal\/kube-system\/ebs-csi-node-262jx\/liveness-probe",
# "name":"eks_multitenant\/pvc-25e04c99-495d-4a45-bf45-560d23954117\/__idle__",
# "name":"eks_multitenant\/ip-10-10-13-70.ec2.internal\/__idle__"
regex_pattern = r"eks_multitenant.*(?:compute.internal|ec2.internal).*__idle__"
mask = df["name"].str.contains(regex_pattern)
filtered_df = df[mask].copy()
return filtered_df
def distribute_idle_costs(df_step: pd.DataFrame, step_duration: int) -> pd.DataFrame:
"""
Distribute the idle costs on a per-node basis across workloads running on that node, excluding daemonsets.
Args:
df_step (pd.DataFrame): A DataFrame with an hourly/daily breakdown of costs.
Returns:
pd.DataFrame: The modified DataFrame with distributed idle costs.
"""
df_step["distributed_idle_cost"] = 0.0
workloads = df_step[
# Exclude rows without node property, mostly "__unmounted__"
(
df_step["properties"].apply(
lambda x: "node" in x if isinstance(x, dict) else False
)
)
&
# Exclude daemonsets
(
df_step["properties"].apply(
lambda x: x.get("controllerKind") if isinstance(x, dict) else None
)
!= "daemonset"
)
&
# Exclude per node __idle__ rows
(~df_step["name"].str.contains("/__idle__"))
]
df_idle = filter_node_idle_rows(df_step)
for _, idle_row in df_idle.iterrows():
node = idle_row["properties"].get("node", None)
if not node:
logging.warning(f"{idle_row['name']} doesn't have a `node` property")
continue
# Filter workloads that run on the current node
node_workloads = workloads[
workloads["properties"].apply(
lambda x: x.get("node") if isinstance(x, dict) else None
)
== node
].copy()
if node_workloads.empty:
logging.debug(f"No workloads found for Node: {node}")
# Calculate allocation for CPU and memory based on max(request, usage)
# https://docs.kubecost.com/using-kubecost/navigating-the-kubecost-ui/cost-allocation/efficiency-idle#idle
node_workloads["cpu_allocation"] = node_workloads[
["cpuCoreRequestAverage", "cpuCoreUsageAverage"]
].max(axis=1)
node_workloads["memory_allocation"] = node_workloads[
["ramByteRequestAverage", "ramByteUsageAverage"]
].max(axis=1)
# Calculate the impact for CPU and memory separately
cpu_total_cost_impact = node_workloads["cpu_allocation"].sum()
memory_total_cost_impact = node_workloads["memory_allocation"].sum()
# Get the max impact between CPU and memory for each workload
node_workloads["cpu_impact"] = (
node_workloads["cpu_allocation"] / cpu_total_cost_impact
if cpu_total_cost_impact != 0
else 0
)
node_workloads["memory_impact"] = (
node_workloads["memory_allocation"] / memory_total_cost_impact
if memory_total_cost_impact != 0
else 0
)
# Total cost impact is the max between CPU impact and memory impact
node_workloads["cost_impact"] = node_workloads[
["cpu_impact", "memory_impact"]
].max(axis=1)
# Adjust cost impact by workload run duration
node_workloads["adjusted_cost_impact"] = node_workloads["cost_impact"] * (
node_workloads["minutes"] / step_duration
)
# Normalize the adjusted cost impacts to ensure they sum to 1
total_adjusted_impact = node_workloads["adjusted_cost_impact"].sum()
node_workloads["normalized_cost_impact"] = (
node_workloads["adjusted_cost_impact"] / total_adjusted_impact
if total_adjusted_impact != 0
else 0
)
# Distribute idle costs among node workloads based on normalized impact
node_workloads["distributed_idle_cost"] = (
node_workloads["normalized_cost_impact"] * idle_row["totalCost"]
)
for nw_index, nw_row in node_workloads.iterrows():
df_step.at[nw_index, "distributed_idle_cost"] = nw_row[
"distributed_idle_cost"
]
node_idle_cost = idle_row["totalCost"]
sum_distributed_costs = node_workloads["distributed_idle_cost"].sum()
diff = round(node_idle_cost - sum_distributed_costs, 5)
# Log debug info if the difference is greater than 0
if diff > 0:
logging.debug(f"Node: {node}")
logging.debug(f"Node Idle cost: {node_idle_cost}")
logging.debug(f"Sum of distributed costs: {sum_distributed_costs}")
logging.debug(f"Difference: {diff}")
return df_step
def enrich_from_assets(df_step, df_assets_step):
"""
Enriches provided allocations dataset with the node labels collected from assets.
Args:
df_step, df_assets_step (pd.DataFrame): A DataFrame with an hourly/daily breakdown of costs.
Returns:
pd.DataFrame: The modified DataFrame with labels added to node Idle entities.
"""
# Predefined Karpenter labels we're interested in
selected_karpenter_labels = [
"label_dedicated",
"label_karpenter_sh_capacity_type", # spot/on-demand
"label_karpenter_sh_provisioner_name",
]
df_idle = filter_node_idle_rows(df_step)
# Filter assets_df for corresponding node and type "Node"
nodes = df_assets_step[df_assets_step["type"] == "Node"]
node_data_dict = {}
for _, node_row in nodes.iterrows():
node_data_name = node_row["properties"].get("name")
node_data_dict[node_data_name] = node_row
for index, row in df_idle.iterrows():
# Example: "name":"eks_multitenant/ip-10-10-13-231.ec2.internal/__idle__"
node_name = row["name"].split("/")[1]
node_data = node_data_dict.get(node_name, None)
if not node_data is None:
node_labels = node_data["labels"]
if node_labels: # Check if node_labels is not None
karpenter_data = {
k: node_labels[k]
for k in selected_karpenter_labels
if k in node_labels
}
df_step.loc[index, "properties"].update(karpenter_data)
else:
logging.warning(f"Warning: Labels for Node {node_name} are None.")
else:
logging.warning(
f"Warning: No corresponding Node found in assets for Idle record {node_name}."
)
return df_step
def save_to_s3(df, output_filename, ct):
# Save data based on the desired output format
if OUTPUT_FORMAT == "parquet":
df.to_parquet(output_filename, compression="gzip")
elif OUTPUT_FORMAT == "json":
df.to_json(output_filename, orient="records", date_format="iso", indent=4)
else:
logging.error(f"Unsupported output format: {OUTPUT_FORMAT}")
sys.exit(1)
if DESTINATION_S3_BUCKET:
# upload result file onto s3
s3 = boto3.client("s3", region_name=REGION)
key = f"{DESTINATION_S3_BUCKET_PREFIX}/year={ct.year}/month={ct.month}/day={ct.day}/region={REGION}/{output_filename}"
logging.info(
f"Uploading file {output_filename} into s3://{DESTINATION_S3_BUCKET}/{key}"
)
try:
s3.upload_file(output_filename, DESTINATION_S3_BUCKET, key)
except Exception as e:
logging.error(
f"Failed to upload the file into s3://{DESTINATION_S3_BUCKET}/{key} with error: "
+ str(e)
)
sys.exit(1)
finally:
pass
os.remove(output_filename)
else:
logging.warning("S3 bucket was not provided, creating report locally")
def store_data_by_window(days_from_today, step):
window, by_day = get_24_hour_window(days_from_today)
df_day = pd.DataFrame()
assets_df_day = pd.DataFrame()
ingestion_time = datetime.now(pytz.utc) # ingestion time
# Get allocations
data = fetch_data(
KUBECOST_ENDPOINT, window, step=step, splits=SPLITS, model_type="allocation"
)
# Get assets
assets_data = fetch_data(
KUBECOST_ENDPOINT, window, step=step, splits=SPLITS, model_type="assets"
)
# Number of chunks, depends on the step and the window.
chunks_len = len(data)
# Convert step into total minutes for idle costs distribution
if step == "1h":
step_duration = 60
elif step == "1d":
step_duration = 1440
for chunk in range(chunks_len):
log_prefix = f"window:{window}, {chunk+1}/{len(data)}"
logging.info(f"{log_prefix}. Creating dataframe from response['data'] field")
data_per_chunk = data[chunk]
assets_data_per_chunk = assets_data[chunk]
if not data_per_chunk:
logging.warning(
f"{log_prefix}.Skipping chunk {chunk}/{chunks_len}, because it's empty"
)
continue
if not assets_data_per_chunk:
logging.warning(
f"{log_prefix}.Assets are missing for chunk {chunk}/{chunks_len}"
)
df_step = pd.DataFrame(list(data_per_chunk.values()))
df_assets_chunk = pd.DataFrame(list(assets_data_per_chunk.values()))
logging.info(f"{log_prefix}. Enriching data from assets")
df_step = enrich_from_assets(df_step, df_assets_chunk)
logging.info(
f"{log_prefix}. Distributing idle costs across non-idle allocations"
)
df_step = distribute_idle_costs(df_step, step_duration)
logging.info(
f"{log_prefix}. Converting json feilds into str, for readability"
)
df_step = convert_fields_to_str(df_step)
logging.info(
f"{log_prefix}. Adding metedata {ingestion_time}, {ENVIRONMENT}, {REGION}, {AWS_ACCOUNT_ID}, {KUBECOST_ENDPOINT}"
)
df_step["ingestion_ts"] = ingestion_time
logging.info(f"{log_prefix}. Adding utility columns")
df_step = add_utility_columns(df_step)
logging.info(f"{log_prefix}. Dropping broken columns")
df_step = drop_broken_columns(df_step)
df_day = pd.concat([df_day, df_step]).reset_index(drop=True)
assets_df_day = pd.concat([assets_df_day, df_assets_chunk]).reset_index(
drop=True
)
file_extension = OUTPUT_FORMAT
output_filename = f"kubecost_{ENVIRONMENT}_{AWS_ACCOUNT_ID}_{by_day.strftime('%Y%m%d')}.{file_extension}"
logging.info(f"Saving data for window: {window} to {output_filename}")
save_to_s3(df_day, output_filename, by_day)
# Total Idle Cost
mask = df_day["name"].str.contains("__idle__")
total_idle_cost = df_day[mask]["totalCost"].sum()
# Total Cost (Including Idle)
total_cost = df_day["totalCost"].sum()
# Total distributed Idle Cost
total_distributed_idle_cost = df_day["distributed_idle_cost"].sum()
# Total Cost excluding Idle
non_idle_cost = total_cost - total_idle_cost
# Total Cost of assets for the same period
assets_total_cost = assets_df_day["totalCost"].sum()
# Provisioner assigned costs
mask = (df_day["name"].str.contains("__idle__")) & (
df_day["properties"].str.contains("label_karpenter_sh_provisioner_name")
)
total_provisioner_idle_cost = df_day[mask]["totalCost"].sum()
logging.info("Total Idle Cost: %s", total_idle_cost)
logging.info("Total distributed Idle Cost: %s", total_distributed_idle_cost)
logging.info(
"Total provisioner assigned Idle Cost: %s", total_provisioner_idle_cost
)
logging.info("Total Cost: %s", total_cost)
logging.info("Total Non-Idle Cost: %s", non_idle_cost)
logging.info("Total Assets Cost: %s", assets_total_cost)
def get_24_hour_window(days):
"""
Creates a 24 hour window timeframe starting at today-${days}.
Args:
days (int): Which day since today to create window for.
Returns:
window (str): Window in a format that kubecost expects.
start_data (datetime): The start date.
"""
date_today = datetime.now(pytz.utc)
start_date = date_today - timedelta(days=days)
end_date = date_today - timedelta(days=days - 1)
start_date_fmt = start_date.strftime("%Y-%m-%dT00:00:00Z")
end_date_fmt = end_date.strftime("%Y-%m-%dT00:00:00Z")
window = f"{start_date_fmt},{end_date_fmt}"
return window, start_date
def main():
# Validate input
validate_parameters()
if DESTINATION_S3_BUCKET:
log_aws_identity()
store_data_by_window(int(WINDOW), STEP)
if __name__ == "__main__":
main()
boto3==1.26.146
botocore==1.29.146
certifi==2023.7.22
charset-normalizer==3.1.0
idna==3.7
jmespath==1.0.1
numpy==1.24.3
pandas==2.0.2
pyarrow==14.0.1
python-dateutil==2.8.2
pytz==2023.3
requests==2.32.0
s3transfer==0.6.1
six==1.16.0
tzdata==2023.3
urllib3==1.26.19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment