Last active
March 7, 2024 10:34
-
-
Save krzysztofantczak/8a2d76d75f1bc5d5255f364026b18617 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from prometheus_api_client import PrometheusConnect | |
from datetime import datetime | |
# Connect to Prometheus | |
prom = PrometheusConnect(url="http://prometheus.example.com:9090/", disable_ssl=True) | |
# Define labels to filter metrics | |
environment = 'production' | |
appcode = 'your_application_code' | |
# Define time range for the analysis | |
start_time_before = '2024-01-01T00:00:00Z' # Time before the incident | |
end_time_before = '2024-01-31T23:59:59Z' | |
start_time_during = '2024-02-01T00:00:00Z' # Time during the incident | |
end_time_during = '2024-02-29T23:59:59Z' | |
# Convert start_time and end_time strings to datetime objects | |
start_datetime_before = datetime.strptime(start_time_before, '%Y-%m-%dT%H:%M:%SZ') | |
end_datetime_before = datetime.strptime(end_time_before, '%Y-%m-%dT%H:%M:%SZ') | |
start_datetime_during = datetime.strptime(start_time_during, '%Y-%m-%dT%H:%M:%SZ') | |
end_datetime_during = datetime.strptime(end_time_during, '%Y-%m-%dT%H:%M:%SZ') | |
# Define phrases to ignore | |
ignore_phrases = ['kafka_topics', 'consumer_offsets', 'other_unwanted_phrase'] | |
# Define Prometheus queries for the reference metrics | |
problematic_metric_query_before = 'your_problematic_metric_name{environment="' + environment + '", appcode="' + appcode + '"}' | |
problematic_metric_query_during = 'your_problematic_metric_name{environment="' + environment + '", appcode="' + appcode + '"}' | |
non_problematic_metric_query_before = 'your_non_problematic_metric_name{environment="' + environment + '", appcode="' + appcode + '"}' | |
non_problematic_metric_query_during = 'your_non_problematic_metric_name{environment="' + environment + '", appcode="' + appcode + '"}' | |
# Fetch data for the reference metrics | |
problematic_data_before = prom.custom_query_range(problematic_metric_query_before, start_time=start_datetime_before, end_time=end_datetime_before, step='1h') | |
problematic_data_during = prom.custom_query_range(problematic_metric_query_during, start_time=start_datetime_during, end_time=end_datetime_during, step='1h') | |
non_problematic_data_before = prom.custom_query_range(non_problematic_metric_query_before, start_time=start_datetime_before, end_time=end_datetime_before, step='1h') | |
non_problematic_data_during = prom.custom_query_range(non_problematic_metric_query_during, start_time=start_datetime_during, end_time=end_datetime_during, step='1h') | |
# Convert data to DataFrames | |
problematic_df_before = pd.DataFrame(problematic_data_before) | |
problematic_df_during = pd.DataFrame(problematic_data_during) | |
non_problematic_df_before = pd.DataFrame(non_problematic_data_before) | |
non_problematic_df_during = pd.DataFrame(non_problematic_data_during) | |
# Define function to calculate statistical metrics | |
# Define function to calculate statistical metrics | |
def calculate_metrics(data): | |
metrics = {} | |
if not data.empty: | |
# Extract values from the 'values' key within each dictionary | |
values_list = data['values'].apply(lambda x: [v[1] for v in x]).tolist() | |
# Flatten the list of values | |
values_flat = [item for sublist in values_list for item in sublist] | |
# Filter out invalid values and convert to numeric | |
valid_values = [float(value) for value in values_flat if isinstance(value, (int, float, str)) and value.replace('.', '', 1).isdigit()] | |
# Calculate statistical metrics | |
metrics['mean'] = pd.Series(valid_values).mean() | |
metrics['std_dev'] = pd.Series(valid_values).std() | |
metrics['quantile_95'] = pd.Series(valid_values).quantile(0.95) | |
else: | |
metrics['mean'] = metrics['std_dev'] = metrics['quantile_95'] = None | |
return metrics | |
# Calculate statistical metrics for reference data | |
problematic_metrics_before = calculate_metrics(problematic_df_before) | |
problematic_metrics_during = calculate_metrics(problematic_df_during) | |
non_problematic_metrics_before = calculate_metrics(non_problematic_df_before) | |
non_problematic_metrics_during = calculate_metrics(non_problematic_df_during) | |
# Define Prometheus query for fetching all metrics based on labels | |
query_all_metrics = '{environment="' + environment + '", appcode="' + appcode + '"}' | |
# Fetch all metrics based on labels and time range | |
all_metrics_data = prom.custom_query_range(query_all_metrics, start_time=start_datetime_during, end_time=end_datetime_during, step='1h') | |
# Convert data to DataFrame | |
all_metrics_df = pd.DataFrame(all_metrics_data) | |
# Calculate statistical metrics for each metric and compare with reference metrics | |
for metric_name, metric_data in all_metrics_df.groupby('metric_name'): | |
# Check if metric contains any of the ignore phrases | |
if any(phrase in metric_name for phrase in ignore_phrases): | |
continue # Skip processing if the metric contains an ignore phrase | |
metric_metrics = calculate_metrics(metric_data) | |
# Compare statistical metrics with reference metrics | |
if (metric_metrics['mean'] > (problematic_metrics_during['mean'] + 3 * problematic_metrics_during['std_dev'])) or \ | |
(metric_metrics['quantile_95'] > (problematic_metrics_during['quantile_95'] + 3 * problematic_metrics_during['std_dev'])): | |
print("Potential anomaly detected in metric:", metric_name) | |
elif (metric_metrics['mean'] < (non_problematic_metrics_during['mean'] - 3 * non_problematic_metrics_during['std_dev'])) or \ | |
(metric_metrics['quantile_95'] < (non_problematic_metrics_during['quantile_95'] - 3 * non_problematic_metrics_during['std_dev'])): | |
print("Potential anomaly detected in metric:", metric_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment