Skip to content

Instantly share code, notes, and snippets.

@cherkaskyb
Last active November 13, 2024 17:32
Show Gist options
  • Save cherkaskyb/576f492582c82b6ca976e46c9fe23d2e to your computer and use it in GitHub Desktop.
Save cherkaskyb/576f492582c82b6ca976e46c9fe23d2e to your computer and use it in GitHub Desktop.
import os
from datadog_api_client import Configuration, ApiClient
from datadog_api_client.exceptions import ApiAttributeError
from datadog_api_client.exceptions import ApiTypeError, ApiValueError
from datadog_api_client.v1.api.dashboards_api import DashboardsApi
from datadog_api_client.v1.api.monitors_api import MonitorsApi
from datadog_api_client.v1.model.formula_and_function_metric_data_source import FormulaAndFunctionMetricDataSource
from datadog_api_client.v2.api.metrics_api import MetricsApi
# General information:
# This is a POC script we used internally to scope out custom metrics that are suspected to be unused by
# Any Monitor or Dashboard.
# Important Notes:
# This scripts checks for cusom metrics usage in Monitors and Dahsboards.
# Custom metrics can still be in use by Notes and extranal API calls which are unchecked by this tool,
# and can be explored using other means.
# Usage:
# Set the DD_API_KEY, DD_APP_KEY, DD_SITE env vars with correct Datadog api keys
# To use it, api keys should have the following scopes:
# - timeseries_read
# - metric_read
# - monitor_read
# - dashboard_read scopes
class DatadogClient:
def __init__(self, api_key=None, app_key=None, host=None):
final_api_key = api_key or os.environ["DD_API_KEY"]
final_app_key = app_key or os.environ["DD_APP_KEY"]
final_host = host or os.environ["DD_SITE"]
self.__configuration = Configuration(api_key={"apiKeyAuth": final_api_key, "appKeyAuth": final_app_key},
host=final_host)
def get_monitors(self):
with ApiClient(self.__configuration) as api_client:
api_instance = MonitorsApi(api_client)
response = api_instance.list_monitors()
return response
def get_dashboards(self):
with ApiClient(self.__configuration) as api_client:
api_instance = DashboardsApi(api_client)
response = api_instance.list_dashboards(
filter_shared=False,
)
return response.dashboards
def get_metric_names(self):
metric = "datadog.estimated_usage.metrics.custom.by_metric"
with ApiClient(self.__configuration) as api_client:
api_instance = MetricsApi(api_client)
response = api_instance.list_tags_by_metric_name(
metric_name=metric,
)
tags = response['data']['attributes']['tags']
metric_names = set(map(lambda x: x.split(':')[1], tags))
metric_names = self._sanitize_metric_names(metric_names)
return metric_names
def dashboard_details(self, dash_id):
with ApiClient(self.__configuration) as api_client:
api_instance = DashboardsApi(api_client)
response = api_instance.get_dashboard(
dashboard_id=dash_id
)
return response
SUFFIXES_TO_REMOVE = ["avg", "median", "95percentile", "max", "sum", "count", "bucket"]
def _sanitize_metric_names(self, metric_names):
sanitized = set()
for metric in metric_names:
sanitized_metric = metric
for suffix in self.SUFFIXES_TO_REMOVE:
if metric.endswith(f".{suffix}"):
sanitized_metric = metric[:-(len(suffix)+1)]
break
sanitized.add(sanitized_metric)
print(f"Sanitation, reduced metrics: {len(metric_names)} -> {len(sanitized)}")
return sanitized
class DatadogWidgetParser:
TYPES_WITH_QUERIES = [
"query_value",
"timeseries",
"toplist",
"change",
"distribution",
"geomap",
"query_table",
"treemap",
"timeseries",
]
# This is unused - but keeping it here for future reference
TYPES_WITHOUT_A_METRIC = [
"free_text",
"iframe",
"image",
"note",
"alert_graph",
"alert_value",
"check_status",
"hostmap",
"topology_map",
"trace_service",
"manage_status",
"slo",
"slo_list",
]
def extract_queries(self, widget):
queries = []
try:
widget_type = str(widget.definition.type)
if widget_type in self.TYPES_WITH_QUERIES:
for request in widget.definition.requests:
queries.extend(self._extract_request(request))
elif widget_type == "group":
print("Extracting sub group")
queries.extend(self._extract_queries_from_subgroup(widget))
elif widget == "scatterplot":
for request in widget.definition.requests:
queries.append(request.x.q)
queries.append(request.y.q)
else:
# Types without queries
pass
return queries
except ApiAttributeError as e:
widget_keys = set(widget.definition.to_dict().keys())
print(
f"Failed parsing widget of type {widget.definition.type}," f" properties are {widget_keys}. error is: {e}")
return queries
@staticmethod
def _extract_request(request):
queries = []
if type(request) == dict:
attrs = set(request.keys())
else:
attrs = set(request.attribute_map.keys())
if not any(map(lambda a: a in ["q", "query", "queries"], attrs)):
print(f"Could not extract any properties from request. available properties are {attrs}")
if hasattr(request, "q"):
queries.append(request.q)
if hasattr(request, "query"):
q = request.query
if q.data_source != FormulaAndFunctionMetricDataSource.METRICS:
print(f"Skipping query of datasource: {q.data_source}")
return queries
queries.append(q.query)
if hasattr(request, "queries"):
for q in request.queries:
q_dict = q
# The api sometimes returns dicts instead of query definition objects :facepalm:
if type(q) != dict:
q_dict = q.to_dict()
if q_dict["data_source"] != str(FormulaAndFunctionMetricDataSource.METRICS):
print(f"Skipping query of datasource: {q_dict['data_source']}")
continue
queries.append(q["query"])
return queries
def _extract_queries_from_subgroup(self, widget):
queries = []
for sub_group_widget in widget.definition.widgets:
queries.extend(self.extract_queries(sub_group_widget))
return queries
class UnusedMetricsFinder:
def __init__(self):
self.client = DatadogClient()
def _get_used_metrics_in_dashboards(self):
queries = []
failures = []
parser = DatadogWidgetParser()
dashboards = self.client.get_dashboards()
print(f"Processing {len(dashboards)} dashboards")
for dash in dashboards:
dash_id = dash.id
try:
print(f"processing dashboard: {dash_id}")
details = self.client.dashboard_details(dash_id)
for widget in details.widgets:
widget_queries = parser.extract_queries(widget)
queries.extend(widget_queries)
except (ApiValueError, ApiTypeError) as e:
print(f"Could not fetch dashboard. error is: {e}") # noqa
failures.append(dash_id)
print(f"Found {len(queries)} queries in dashboards")
print(f"Failed processing {len(failures)} dashboards with ids: {failures}")
return queries
def _get_used_metrics_in_monitors(self):
queries = []
monitors = self.client.get_monitors()
for monitor in monitors:
queries.append(monitor.query)
print(f"Found {len(queries)} Monitor queries")
return queries
def find(self):
all_custom_metrics = self.client.get_metric_names()
print(f"Found {len(all_custom_metrics)} Custom metrics")
monitor_queries = self._get_used_metrics_in_monitors()
dashboards_queries = self._get_used_metrics_in_dashboards()
unused_suspects = self._check_all_metrics(all_custom_metrics, dashboards_queries, monitor_queries)
print(f"{len(unused_suspects)}/{len(all_custom_metrics)} Custom Metrics are suspected as unused")
return unused_suspects
def _check_all_metrics(self, all_custom_metrics, dashboards_queries, monitor_queries):
unused_suspects = []
count = 0
for metric in all_custom_metrics:
used_in_dashboards = self._is_substring_in_any_element(metric, dashboards_queries)
used_in_monitors = self._is_substring_in_any_element(metric, monitor_queries)
if not used_in_monitors and not used_in_dashboards:
unused_suspects.append(metric)
count += 1
return unused_suspects
@staticmethod
def _is_substring_in_any_element(metric, queries):
# Check whether the metric is a sub-query in any monitor / dashboard query
return any(filter(lambda monitor_q: metric in monitor_q, queries))
if __name__ == "__main__":
metrics = UnusedMetricsFinder().find()
for m in metrics:
print(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment