About: This document contains summary of a real-time anomaly detection system developed by me (and team) for Anadarko's crude oil extraction facilities during my time at Quantiphi.
- Our client has facilities of 2 types. For both, we want to have models that can do anomaly detection in real time.
- For type I we have some samples marked as anomalies, so we can either train a supervised ML model (with the challenge of an extreme bias dataset) or we can use the model trained for type II dataset but with some finetuning using the annotation. We go with the 2nd route.
- For type II we do not have any marked samples and we have to train a model in unsupervised setting
- We want high recall with decent precision since false negatives incur higher costs (replacing entire components) whereas false positives incur marginal costs (extra component checks)
- Dashboard showing plot in real time and email and SMS alert systems.
Few Thousand sensors dump their readings each second to client's on premise OpenTSDB server. This server is accessible only within thei virtual network.
Why OpenTSBB?
- Open-Source Distributed, scalable Time Series Database software written on top of HBase.
- Features/Concepts provided by OpenTSDB API: Epoch(time), Metric, Tag, Downsample, Aggregation, Interpolation
gke-apa-opentsdb
module makes a ReST call to openTSDB api to get data based on a window size to insert it to BigQuery. Here is the structure of this module:
import requests
from urllib3.util.retry import Retry
import pandas as pd
def tsdb(tsuids_dict, tsdb_address, tsdb_start, tsdb_end, tsdb_downsample):
api_endpoint = tsdb_address + "/api/query/"
df = pd.DataFrame()
for key in tsuids_dict:
params = {
'start': tsdb_start,
'end': tsdb_end,
'ms': 'true',
'show_tsuids': 'true',
'tsuid': tsdb_downsample + ":" + tsuids_dict[key]
}
s = requests.Session()
s.params = params
s.verify = False
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount(tsdb_address, HTTPAdapter(max_retries=retries))
r = s.get(api_endpoint, timeout=5)
response = r.json()
for single_tag_data in response:
data = pd.DataFrame(list(single_tag_data['dps'].items()), columns=['timestamp', 'value','metric','tsuid','quality'])
df.append(data)
return df
tsuids_dict = get_tsuids_dict() # Use args.tag_dataset to form this dictionary
df = tsdb(tsuids_dict, hostname, start_ts, end_ts, sampling_method)
push_to_bq(df, credentials, table, bq_project_id, tags)
We use it like so:
python downsampler.py --opentsdb-hostname $OPENTSDB_HOSTNAME project_id $PROJECT_ID config_table_name $CONFIG_TABLE_NAME config_dataset $CONFIG_DATASET tag_dataset $TAG_DATASET window_size $WINDOW_SIZE
Side Note: Variable values are coming from environment (can check using env
command) which is defined in Dockerfile
used to spin the instance
This module is designed to be invoked from the DAG using a GKEPodOperator in GCP Composer/Airflow.
-
Our approach was discussed in this PPT
-
It can be done in both supervised and unsupervised manner.
-
Underlying method remains same: Autoencoder is trained for reconstruction.
-
We train 3 different types of autoencoder: Dense, LSTM and PCA.
-
Measure reconstruction error vector and Mahalanobis Distance or MD
-
Mahalanobis distance:
$$D = \sqrt{(X-\mu)^T \Sigma^{-1} (X-\mu)}$$ -
$\mu$ and$\sigma$ is given by domain expert or can be calculated from retrospective data -
Let us Define the metric
$$F_\beta = \frac{(1+\beta^2)PR}{\beta^2P+R}$$ where P is precision ie TP/(TP+FP) and R is recall ie TP/(TP+FN). Higher the value of$\beta$ , higher is the importance of recall wrt precision. For example,$\beta = 2$ makes recall twice as important as precision while$\beta=0.5$ does the opposite.$\beta=1$ gives us F1 Score and$\beta=0$ gives us precision. -
Supervised: If we know Y variable then we can calculate the threshold that maximizes
$F_\beta$ for some$\beta$ . Usually we want low precision and high recall so$\beta=1/.05$ is fine for thresholding. -
In Unsupervised, we dont have Y so we cant calculate
$F_\beta$ . Here we just use K standard deviations away from mean as threshold for MD assuming guassian distribution of MD with K=3.
Here is the structure of the models re-written in PyTorch for easier understanding. In the actual project, this was all done in tensorflow==1.9.0
which is now legacy.
Notice that all the models have exactly the same API so that other persons involved in the project can use it without getting into the details of the model. Here is a demo of the API:
import torch
input = torch.randn(num_samples, input_size)
model = ChosenAutoencoder(input_size, hidden_sizes)
reconstructed_output, latent_layer = model(input)
Defining the models concisely:
import torch.nn as nn
class DenseAutoencoder(nn.Module):
def __init__(self, input_size, hidden_sizes):
super().__init__()
encoder_layers = [nn.Linear(input_size, hidden_sizes[0]), nn.ReLU()]
for i in range(len(hidden_sizes) - 1):
encoder_layers.extend([nn.Linear(hidden_sizes[i], hidden_sizes[i+1]), nn.ReLU()])
self.encoder = nn.Sequential(*encoder_layers)
decoder_layers = [nn.Linear(hidden_sizes[-1], input_size), nn.ReLU()]
for i in range(len(hidden_sizes) - 1, 0, -1):
decoder_layers.extend([nn.Linear(hidden_sizes[i], hidden_sizes[i-1]), nn.ReLU()])
self.decoder = nn.Sequential(*decoder_layers)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
class LSTMAutoencoder(nn.Module):
def __init__(self, input_size, hidden_sizes, num_layers=1):
super().__init__()
encoder_layers = []
for i in range(len(hidden_sizes)):
if i == 0:
encoder_layers.append(nn.LSTM(input_size, hidden_sizes[i], num_layers, batch_first=True))
else:
encoder_layers.append(nn.LSTM(hidden_sizes[i-1], hidden_sizes[i], num_layers, batch_first=True))
encoder_layers.append(nn.ReLU())
self.encoder = nn.Sequential(*encoder_layers)
decoder_layers = []
for i in range(len(hidden_sizes)-1, -1, -1):
if i == len(hidden_sizes)-1:
decoder_layers.append(nn.LSTM(hidden_sizes[i], input_size, num_layers, batch_first=True))
else:
decoder_layers.append(nn.LSTM(hidden_sizes[i+1], hidden_sizes[i], num_layers, batch_first=True))
decoder_layers.append(nn.ReLU())
self.decoder = nn.Sequential(*decoder_layers)
def forward(self, x):
# Encoder
_, (hidden_state, cell_state) = self.encoder(x)
# Decoder
decoded_output, _ = self.decoder(hidden_state, cell_state)
return decoded_output, cell_state
# Note: For this model, we can either use the (mean, cov) of the batch as given below
# or we can use predefined statistics (given by client)
class PCAAutoencoder(nn.Module):
def __init__(self, input_size, latent_size):
super().__init__()
self.encoder = nn.Linear(input_size, latent_size, bias=False)
self.decoder = nn.Linear(latent_size, input_size, bias=False)
def forward(self, x):
x = x.view(x.size(0), -1)
covariance_matrix = torch.matmul(x.T, x) / (x.size(0) - 1)
eigvals, eigvecs = torch.symeig(covariance_matrix, eigenvectors=True)
top_k_eigvecs = eigvecs[:, -self.encoder.out_features:]
self.encoder.weight.data = top_k_eigvecs.T
self.decoder.weight.data = top_k_eigvecs
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
Training these models is fairly straightforward. We just minimize the reconstruction error.
chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(chosen_autoencoder.parameters(), lr=0.001)
for inputs in dataloader:
outputs, encoded = chosen_autoencoder(inputs)
loss = criterion(outputs, inputs)
optimizer.zero_grad()
loss.backward()
optimizer.step()
Here we tune the reconstruction error thresholds that will maximizes
import torch
from sklearn.metrics import fbeta_score
from collections import defaultDict
beta_value = args.beta_value
chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss()
thresholds = torch.linspace(0, 1, 100)
voting_dict = defaultDict(int)
for inputs, y_true in dataloader: # y_true makes this approach supervised
outputs, encoded = chosen_autoencoder(inputs)
loss = criterion(outputs, inputs)
best_fbeta_score = 0
best_threshold = 0
for threshold in thresholds:
y_pred = (loss > threshold).float()
fbeta_score = fbeta_score(y_true.numpy(), y_pred.numpy(), beta=beta)
if fbeta_score > best_fbeta_score:
best_fbeta_score = fbeta_score
best_threshold = threshold.item()
voting_dict[best_threshold] += 1
overall_best_threshold = max(voting_dict, key=lambda k: voting_dict[k])
chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss(reduction=None)
chosen_autoencoder.eval()
total_samples = 0
overall_mean = torch.zeros(args.n_dim)
overall_std = torch.zeros(args.n_dim)
with torch.no_grad():
for input in data_loader: # Here we do not have y_true in dataloader
outputs, encoded = chosen_autoencoder(inputs)
loss = criterion(outputs, inputs)
batch_size, _ = input.shape()
total_samples += batch_size
overall_mean += loss.sum(dim=0)
overall_std += (loss**2).sum(dim=0)
overall_mean /= total_samples
overall_std = torch.sqrt(overall_std / total_samples - overall_mean)
# Step 2: Set threshold as K standard deviations away from overall mean
overall_best_threshold = args.K * overall_std
We have 3 DAGs
- Weekly for HPTune training for optimizing value of hidden layers, dropout rate etc.
- Daily for Training for optimizing model weights
- Hourly for adjusting thresholds used in further calculation of anomaly
Once we have our thresholds, finding whether a dataset is anomalous or non-anomalous is straightforward:
chosen_autoencoder.eval()
with torch.no_grad():
for input in data_loader:
outputs, encoded = chosen_autoencoder(inputs)
loss = criterion(outputs, inputs)
anomaly_status = (loss>threhold)
if anomaly_status:
send_email_and_sms_alert()
Inference module that runs in real time which ingests data in real time, runs inference for current time step and plots on Grafana application and then forgets about the data point. These two applications (Inference and Grafana) are running each in their own container on a Kubernetes cluster with pods etc specified in a YAML.
Here is a snippet to post anomaly status to client's opentsdb server which is also running a grafana instance.
import requests
import time
opentsdb_url = args.opentsdb_url
metric_name = "anomaly_status"
while True:
# Build the OpenTSDB data point
data_point = {
"metric": metric_name,
"timestamp": int(time.time()),
"value": int(anomaly_status),
}
requests.post(opentsdb_url, json=data_point)
time.sleep(args.interval)
Automating a grafana server using OpenTSDB requires some work on devops front. Specifically you need to provision machines using terraform. Spin up instance using kubectl
with this docker image and configuring grafana to map the source URL to the OpenTSDB server.
Here is a simulation of how we are doing the last part. We run HBase, OpenTSDB and Grafana by following docker run commands:
docker run -d --name hbase harisekhon/hbase
docker run -d -p 4242:4242 --name opentsdb -e HBASE_HOST=hbase petergrace/opentsdb-docker:latest
docker run -d -p 3000:3000 --name grafana -e GF_SECURITY_ADMIN_PASSWORD=admin -e GF_SECURITY_ADMIN_USER=admin --link opentsdb:opentsdb grafana/grafana
Notice that OpenTSDB refers to hbase container name in HBASE_HOST
environment variable (-e
). This is necessary to run OpenTSDB. Also when running grafana, --link
argument links the container named opentsdb
to the grafana container and creates an alias opentsdb
within the grafana container. This is important for creating grafana dashboard because we need to add OpenTSDB address as data source and grafana will able to access it only if we link it or in another words they are within the same docker network.
Further we can use the following python code to test the setup:
import requests, json, time, random
while True:
for metric in ['a','b','c']:
payload = {"metric": f"random_metric_{metric}", "timestamp": int(time.time()), \
"value": random.randint(1, 100), "tags": {"source": "python_script"}}
response = requests.post("http://localhost:4242/api/put", data=json.dumps(payload))
print(response)
time.sleep(1)
Now go the grafana dashboard (http://ip-of-host-running-grafana:3000) and add OpenTSDB as data source. In the URL field enter http://ip-of-host-running-opentsdb:4242. Now create a dashboard and add "queries" and in the metric field look for random_metric_a
and so on. You can go to dashboard settings and turn on auto-refresh and in the dashboard panel edit time range to last 5 minutes. Now you should be able to see a live graph on Grafana.
We replicate a similar setting for our usecase where we send the anomaly status and the reconstruction error value of the live datapoint to OpenTSDB to be viewable on grafana dashboard.