Last active
November 13, 2024 17:32
-
-
Save cherkaskyb/576f492582c82b6ca976e46c9fe23d2e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datadog_api_client import Configuration, ApiClient | |
from datadog_api_client.exceptions import ApiAttributeError | |
from datadog_api_client.exceptions import ApiTypeError, ApiValueError | |
from datadog_api_client.v1.api.dashboards_api import DashboardsApi | |
from datadog_api_client.v1.api.monitors_api import MonitorsApi | |
from datadog_api_client.v1.model.formula_and_function_metric_data_source import FormulaAndFunctionMetricDataSource | |
from datadog_api_client.v2.api.metrics_api import MetricsApi | |
# General information: | |
# This is a POC script we used internally to scope out custom metrics that are suspected to be unused by | |
# Any Monitor or Dashboard. | |
# Important Notes: | |
# This scripts checks for cusom metrics usage in Monitors and Dahsboards. | |
# Custom metrics can still be in use by Notes and extranal API calls which are unchecked by this tool, | |
# and can be explored using other means. | |
# Usage: | |
# Set the DD_API_KEY, DD_APP_KEY, DD_SITE env vars with correct Datadog api keys | |
# To use it, api keys should have the following scopes: | |
# - timeseries_read | |
# - metric_read | |
# - monitor_read | |
# - dashboard_read scopes | |
class DatadogClient: | |
def __init__(self, api_key=None, app_key=None, host=None): | |
final_api_key = api_key or os.environ["DD_API_KEY"] | |
final_app_key = app_key or os.environ["DD_APP_KEY"] | |
final_host = host or os.environ["DD_SITE"] | |
self.__configuration = Configuration(api_key={"apiKeyAuth": final_api_key, "appKeyAuth": final_app_key}, | |
host=final_host) | |
def get_monitors(self): | |
with ApiClient(self.__configuration) as api_client: | |
api_instance = MonitorsApi(api_client) | |
response = api_instance.list_monitors() | |
return response | |
def get_dashboards(self): | |
with ApiClient(self.__configuration) as api_client: | |
api_instance = DashboardsApi(api_client) | |
response = api_instance.list_dashboards( | |
filter_shared=False, | |
) | |
return response.dashboards | |
def get_metric_names(self): | |
metric = "datadog.estimated_usage.metrics.custom.by_metric" | |
with ApiClient(self.__configuration) as api_client: | |
api_instance = MetricsApi(api_client) | |
response = api_instance.list_tags_by_metric_name( | |
metric_name=metric, | |
) | |
tags = response['data']['attributes']['tags'] | |
metric_names = set(map(lambda x: x.split(':')[1], tags)) | |
metric_names = self._sanitize_metric_names(metric_names) | |
return metric_names | |
def dashboard_details(self, dash_id): | |
with ApiClient(self.__configuration) as api_client: | |
api_instance = DashboardsApi(api_client) | |
response = api_instance.get_dashboard( | |
dashboard_id=dash_id | |
) | |
return response | |
SUFFIXES_TO_REMOVE = ["avg", "median", "95percentile", "max", "sum", "count", "bucket"] | |
def _sanitize_metric_names(self, metric_names): | |
sanitized = set() | |
for metric in metric_names: | |
sanitized_metric = metric | |
for suffix in self.SUFFIXES_TO_REMOVE: | |
if metric.endswith(f".{suffix}"): | |
sanitized_metric = metric[:-(len(suffix)+1)] | |
break | |
sanitized.add(sanitized_metric) | |
print(f"Sanitation, reduced metrics: {len(metric_names)} -> {len(sanitized)}") | |
return sanitized | |
class DatadogWidgetParser: | |
TYPES_WITH_QUERIES = [ | |
"query_value", | |
"timeseries", | |
"toplist", | |
"change", | |
"distribution", | |
"geomap", | |
"query_table", | |
"treemap", | |
"timeseries", | |
] | |
# This is unused - but keeping it here for future reference | |
TYPES_WITHOUT_A_METRIC = [ | |
"free_text", | |
"iframe", | |
"image", | |
"note", | |
"alert_graph", | |
"alert_value", | |
"check_status", | |
"hostmap", | |
"topology_map", | |
"trace_service", | |
"manage_status", | |
"slo", | |
"slo_list", | |
] | |
def extract_queries(self, widget): | |
queries = [] | |
try: | |
widget_type = str(widget.definition.type) | |
if widget_type in self.TYPES_WITH_QUERIES: | |
for request in widget.definition.requests: | |
queries.extend(self._extract_request(request)) | |
elif widget_type == "group": | |
print("Extracting sub group") | |
queries.extend(self._extract_queries_from_subgroup(widget)) | |
elif widget == "scatterplot": | |
for request in widget.definition.requests: | |
queries.append(request.x.q) | |
queries.append(request.y.q) | |
else: | |
# Types without queries | |
pass | |
return queries | |
except ApiAttributeError as e: | |
widget_keys = set(widget.definition.to_dict().keys()) | |
print( | |
f"Failed parsing widget of type {widget.definition.type}," f" properties are {widget_keys}. error is: {e}") | |
return queries | |
@staticmethod | |
def _extract_request(request): | |
queries = [] | |
if type(request) == dict: | |
attrs = set(request.keys()) | |
else: | |
attrs = set(request.attribute_map.keys()) | |
if not any(map(lambda a: a in ["q", "query", "queries"], attrs)): | |
print(f"Could not extract any properties from request. available properties are {attrs}") | |
if hasattr(request, "q"): | |
queries.append(request.q) | |
if hasattr(request, "query"): | |
q = request.query | |
if q.data_source != FormulaAndFunctionMetricDataSource.METRICS: | |
print(f"Skipping query of datasource: {q.data_source}") | |
return queries | |
queries.append(q.query) | |
if hasattr(request, "queries"): | |
for q in request.queries: | |
q_dict = q | |
# The api sometimes returns dicts instead of query definition objects :facepalm: | |
if type(q) != dict: | |
q_dict = q.to_dict() | |
if q_dict["data_source"] != str(FormulaAndFunctionMetricDataSource.METRICS): | |
print(f"Skipping query of datasource: {q_dict['data_source']}") | |
continue | |
queries.append(q["query"]) | |
return queries | |
def _extract_queries_from_subgroup(self, widget): | |
queries = [] | |
for sub_group_widget in widget.definition.widgets: | |
queries.extend(self.extract_queries(sub_group_widget)) | |
return queries | |
class UnusedMetricsFinder: | |
def __init__(self): | |
self.client = DatadogClient() | |
def _get_used_metrics_in_dashboards(self): | |
queries = [] | |
failures = [] | |
parser = DatadogWidgetParser() | |
dashboards = self.client.get_dashboards() | |
print(f"Processing {len(dashboards)} dashboards") | |
for dash in dashboards: | |
dash_id = dash.id | |
try: | |
print(f"processing dashboard: {dash_id}") | |
details = self.client.dashboard_details(dash_id) | |
for widget in details.widgets: | |
widget_queries = parser.extract_queries(widget) | |
queries.extend(widget_queries) | |
except (ApiValueError, ApiTypeError) as e: | |
print(f"Could not fetch dashboard. error is: {e}") # noqa | |
failures.append(dash_id) | |
print(f"Found {len(queries)} queries in dashboards") | |
print(f"Failed processing {len(failures)} dashboards with ids: {failures}") | |
return queries | |
def _get_used_metrics_in_monitors(self): | |
queries = [] | |
monitors = self.client.get_monitors() | |
for monitor in monitors: | |
queries.append(monitor.query) | |
print(f"Found {len(queries)} Monitor queries") | |
return queries | |
def find(self): | |
all_custom_metrics = self.client.get_metric_names() | |
print(f"Found {len(all_custom_metrics)} Custom metrics") | |
monitor_queries = self._get_used_metrics_in_monitors() | |
dashboards_queries = self._get_used_metrics_in_dashboards() | |
unused_suspects = self._check_all_metrics(all_custom_metrics, dashboards_queries, monitor_queries) | |
print(f"{len(unused_suspects)}/{len(all_custom_metrics)} Custom Metrics are suspected as unused") | |
return unused_suspects | |
def _check_all_metrics(self, all_custom_metrics, dashboards_queries, monitor_queries): | |
unused_suspects = [] | |
count = 0 | |
for metric in all_custom_metrics: | |
used_in_dashboards = self._is_substring_in_any_element(metric, dashboards_queries) | |
used_in_monitors = self._is_substring_in_any_element(metric, monitor_queries) | |
if not used_in_monitors and not used_in_dashboards: | |
unused_suspects.append(metric) | |
count += 1 | |
return unused_suspects | |
@staticmethod | |
def _is_substring_in_any_element(metric, queries): | |
# Check whether the metric is a sub-query in any monitor / dashboard query | |
return any(filter(lambda monitor_q: metric in monitor_q, queries)) | |
if __name__ == "__main__": | |
metrics = UnusedMetricsFinder().find() | |
for m in metrics: | |
print(m) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment