Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save piyush01123/df365ffb33233813966870b253bc98ba to your computer and use it in GitHub Desktop.
Save piyush01123/df365ffb33233813966870b253bc98ba to your computer and use it in GitHub Desktop.
Real-time anomaly detection for a large industrial facility

Anomaly detection for a large industrial facility

About: This document contains summary of a real-time anomaly detection system developed by me (and team) for Anadarko's crude oil extraction facilities during my time at Quantiphi.

Requirements

  • Our client has facilities of 2 types. For both, we want to have models that can do anomaly detection in real time.
  • For type I we have some samples marked as anomalies, so we can either train a supervised ML model (with the challenge of an extreme bias dataset) or we can use the model trained for type II dataset but with some finetuning using the annotation. We go with the 2nd route.
  • For type II we do not have any marked samples and we have to train a model in unsupervised setting
  • We want high recall with decent precision since false negatives incur higher costs (replacing entire components) whereas false positives incur marginal costs (extra component checks)
  • Dashboard showing plot in real time and email and SMS alert systems.

GKE-Downsampler

Few Thousand sensors dump their readings each second to client's on premise OpenTSDB server. This server is accessible only within thei virtual network.

Why OpenTSBB?

  • Open-Source Distributed, scalable Time Series Database software written on top of HBase.
  • Features/Concepts provided by OpenTSDB API: Epoch(time), Metric, Tag, Downsample, Aggregation, Interpolation

gke-apa-opentsdb module makes a ReST call to openTSDB api to get data based on a window size to insert it to BigQuery. Here is the structure of this module:

import requests
from urllib3.util.retry import Retry
import pandas as pd

def tsdb(tsuids_dict, tsdb_address, tsdb_start, tsdb_end, tsdb_downsample):
    api_endpoint = tsdb_address + "/api/query/"
    df = pd.DataFrame()
    for key in tsuids_dict:
        params = {
            'start': tsdb_start,
            'end': tsdb_end,
            'ms': 'true',
            'show_tsuids': 'true',
            'tsuid': tsdb_downsample + ":" + tsuids_dict[key]
        }
        s = requests.Session()
        s.params = params
        s.verify = False
        retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
        s.mount(tsdb_address, HTTPAdapter(max_retries=retries))
        r = s.get(api_endpoint, timeout=5)
        response = r.json()
        for single_tag_data in response:
            data = pd.DataFrame(list(single_tag_data['dps'].items()), columns=['timestamp', 'value','metric','tsuid','quality'])
            df.append(data)
    return df


tsuids_dict = get_tsuids_dict() # Use args.tag_dataset to form this dictionary 
df = tsdb(tsuids_dict, hostname, start_ts, end_ts, sampling_method)
push_to_bq(df, credentials, table, bq_project_id, tags)

We use it like so:

python downsampler.py --opentsdb-hostname $OPENTSDB_HOSTNAME project_id $PROJECT_ID config_table_name $CONFIG_TABLE_NAME config_dataset $CONFIG_DATASET tag_dataset $TAG_DATASET window_size $WINDOW_SIZE

Side Note: Variable values are coming from environment (can check using env command) which is defined in Dockerfile used to spin the instance

This module is designed to be invoked from the DAG using a GKEPodOperator in GCP Composer/Airflow.

ML part: Anomaly detection using reconstruction.

  • Our approach was discussed in this PPT

  • It can be done in both supervised and unsupervised manner.

  • Underlying method remains same: Autoencoder is trained for reconstruction.

  • We train 3 different types of autoencoder: Dense, LSTM and PCA.

  • Measure reconstruction error vector and Mahalanobis Distance or MD

  • Mahalanobis distance: $$D = \sqrt{(X-\mu)^T \Sigma^{-1} (X-\mu)}$$

  • $\mu$ and $\sigma$ is given by domain expert or can be calculated from retrospective data

  • Let us Define the metric $$F_\beta = \frac{(1+\beta^2)PR}{\beta^2P+R}$$ where P is precision ie TP/(TP+FP) and R is recall ie TP/(TP+FN). Higher the value of $\beta$, higher is the importance of recall wrt precision. For example, $\beta = 2$ makes recall twice as important as precision while $\beta=0.5$ does the opposite. $\beta=1$ gives us F1 Score and $\beta=0$ gives us precision.

  • Supervised: If we know Y variable then we can calculate the threshold that maximizes $F_\beta$ for some $\beta$. Usually we want low precision and high recall so $\beta=1/.05$ is fine for thresholding.

  • In Unsupervised, we dont have Y so we cant calculate $F_\beta$. Here we just use K standard deviations away from mean as threshold for MD assuming guassian distribution of MD with K=3.

Models

Here is the structure of the models re-written in PyTorch for easier understanding. In the actual project, this was all done in tensorflow==1.9.0 which is now legacy.

Notice that all the models have exactly the same API so that other persons involved in the project can use it without getting into the details of the model. Here is a demo of the API:

import torch

input = torch.randn(num_samples, input_size)
model = ChosenAutoencoder(input_size, hidden_sizes)
reconstructed_output, latent_layer = model(input)

Defining the models concisely:

import torch.nn as nn


class DenseAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super().__init__()
        encoder_layers = [nn.Linear(input_size, hidden_sizes[0]), nn.ReLU()]
        for i in range(len(hidden_sizes) - 1):
            encoder_layers.extend([nn.Linear(hidden_sizes[i], hidden_sizes[i+1]), nn.ReLU()])
        self.encoder = nn.Sequential(*encoder_layers)
        decoder_layers = [nn.Linear(hidden_sizes[-1], input_size), nn.ReLU()]
        for i in range(len(hidden_sizes) - 1, 0, -1):
            decoder_layers.extend([nn.Linear(hidden_sizes[i], hidden_sizes[i-1]), nn.ReLU()])
        self.decoder = nn.Sequential(*decoder_layers)

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent


class LSTMAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_layers=1):
        super().__init__()

        encoder_layers = []
        for i in range(len(hidden_sizes)):
            if i == 0:
                encoder_layers.append(nn.LSTM(input_size, hidden_sizes[i], num_layers, batch_first=True))
            else:
                encoder_layers.append(nn.LSTM(hidden_sizes[i-1], hidden_sizes[i], num_layers, batch_first=True))
            encoder_layers.append(nn.ReLU())
        self.encoder = nn.Sequential(*encoder_layers)

        decoder_layers = []
        for i in range(len(hidden_sizes)-1, -1, -1):
            if i == len(hidden_sizes)-1:
                decoder_layers.append(nn.LSTM(hidden_sizes[i], input_size, num_layers, batch_first=True))
            else:
                decoder_layers.append(nn.LSTM(hidden_sizes[i+1], hidden_sizes[i], num_layers, batch_first=True))
            decoder_layers.append(nn.ReLU())
        self.decoder = nn.Sequential(*decoder_layers)

    def forward(self, x):
        # Encoder
        _, (hidden_state, cell_state) = self.encoder(x)

        # Decoder
        decoded_output, _ = self.decoder(hidden_state, cell_state)

        return decoded_output, cell_state

# Note: For this model, we can either use the (mean, cov) of the batch as given below 
# or we can use predefined statistics (given by client)
class PCAAutoencoder(nn.Module):
    def __init__(self, input_size, latent_size):
        super().__init__()
        self.encoder = nn.Linear(input_size, latent_size, bias=False)
        self.decoder = nn.Linear(latent_size, input_size, bias=False)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        covariance_matrix = torch.matmul(x.T, x) / (x.size(0) - 1)
        eigvals, eigvecs = torch.symeig(covariance_matrix, eigenvectors=True)
        top_k_eigvecs = eigvecs[:, -self.encoder.out_features:]
        self.encoder.weight.data = top_k_eigvecs.T
        self.decoder.weight.data = top_k_eigvecs
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

Training

Training these models is fairly straightforward. We just minimize the reconstruction error.

chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(chosen_autoencoder.parameters(), lr=0.001)
for inputs in dataloader:
    outputs, encoded = chosen_autoencoder(inputs)
    loss = criterion(outputs, inputs)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Tuning thresholds in supervised setting

Here we tune the reconstruction error thresholds that will maximizes $F_\beta$ score. Following code shows how to do this and get a dictionary of potential threshold values and the number of samples w

import torch
from sklearn.metrics import fbeta_score
from collections import defaultDict

beta_value = args.beta_value

chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss()
thresholds = torch.linspace(0, 1, 100)
voting_dict = defaultDict(int)

for inputs, y_true in dataloader: # y_true makes this approach supervised
    outputs, encoded = chosen_autoencoder(inputs)
    loss = criterion(outputs, inputs)    
    best_fbeta_score = 0
    best_threshold = 0
    for threshold in thresholds:
        y_pred = (loss > threshold).float()
        fbeta_score = fbeta_score(y_true.numpy(), y_pred.numpy(), beta=beta)
        if fbeta_score > best_fbeta_score:
            best_fbeta_score = fbeta_score
            best_threshold = threshold.item()
    voting_dict[best_threshold] += 1

overall_best_threshold = max(voting_dict, key=lambda k: voting_dict[k])

Tuning thresholds in unsupervised setting

chosen_autoencoder = ChosenAutoencoder()
criterion = nn.MSELoss(reduction=None)

chosen_autoencoder.eval()
total_samples = 0
overall_mean = torch.zeros(args.n_dim)
overall_std = torch.zeros(args.n_dim)

with torch.no_grad():
    for input in data_loader: # Here we do not have y_true in dataloader
        outputs, encoded = chosen_autoencoder(inputs)
        loss = criterion(outputs, inputs)    
        batch_size, _ = input.shape()
        total_samples += batch_size
        overall_mean += loss.sum(dim=0)
        overall_std += (loss**2).sum(dim=0)

overall_mean /= total_samples
overall_std = torch.sqrt(overall_std / total_samples - overall_mean)

# Step 2: Set threshold as K standard deviations away from overall mean
overall_best_threshold = args.K * overall_std

Training Schedule

We have 3 DAGs

  • Weekly for HPTune training for optimizing value of hidden layers, dropout rate etc.
  • Daily for Training for optimizing model weights
  • Hourly for adjusting thresholds used in further calculation of anomaly

Inference

Once we have our thresholds, finding whether a dataset is anomalous or non-anomalous is straightforward:

chosen_autoencoder.eval()
with torch.no_grad():
    for input in data_loader:
        outputs, encoded = chosen_autoencoder(inputs)
        loss = criterion(outputs, inputs)
        anomaly_status = (loss>threhold)
        if anomaly_status:
           send_email_and_sms_alert()

Inference module that runs in real time which ingests data in real time, runs inference for current time step and plots on Grafana application and then forgets about the data point. These two applications (Inference and Grafana) are running each in their own container on a Kubernetes cluster with pods etc specified in a YAML.

Here is a snippet to post anomaly status to client's opentsdb server which is also running a grafana instance.

import requests
import time

opentsdb_url = args.opentsdb_url
metric_name = "anomaly_status"

while True:

    # Build the OpenTSDB data point
    data_point = {
        "metric": metric_name,
        "timestamp": int(time.time()),
        "value": int(anomaly_status),
    }
    requests.post(opentsdb_url, json=data_point)
    time.sleep(args.interval)

Automating a grafana server using OpenTSDB requires some work on devops front. Specifically you need to provision machines using terraform. Spin up instance using kubectl with this docker image and configuring grafana to map the source URL to the OpenTSDB server.

Demo of Grafana and OpenTSDB

Here is a simulation of how we are doing the last part. We run HBase, OpenTSDB and Grafana by following docker run commands:

docker run -d --name hbase harisekhon/hbase
docker run -d -p 4242:4242 --name opentsdb -e HBASE_HOST=hbase petergrace/opentsdb-docker:latest
docker run -d -p 3000:3000 --name grafana -e GF_SECURITY_ADMIN_PASSWORD=admin -e GF_SECURITY_ADMIN_USER=admin --link opentsdb:opentsdb grafana/grafana

Notice that OpenTSDB refers to hbase container name in HBASE_HOST environment variable (-e). This is necessary to run OpenTSDB. Also when running grafana, --link argument links the container named opentsdb to the grafana container and creates an alias opentsdb within the grafana container. This is important for creating grafana dashboard because we need to add OpenTSDB address as data source and grafana will able to access it only if we link it or in another words they are within the same docker network.

Further we can use the following python code to test the setup:

import requests, json, time, random
while True:
    for metric in ['a','b','c']:
        payload = {"metric": f"random_metric_{metric}", "timestamp": int(time.time()), \
                   "value": random.randint(1, 100), "tags": {"source": "python_script"}}
        response = requests.post("http://localhost:4242/api/put", data=json.dumps(payload))
        print(response)
    time.sleep(1)

Now go the grafana dashboard (http://ip-of-host-running-grafana:3000) and add OpenTSDB as data source. In the URL field enter http://ip-of-host-running-opentsdb:4242. Now create a dashboard and add "queries" and in the metric field look for random_metric_a and so on. You can go to dashboard settings and turn on auto-refresh and in the dashboard panel edit time range to last 5 minutes. Now you should be able to see a live graph on Grafana.

We replicate a similar setting for our usecase where we send the anomaly status and the reconstruction error value of the live datapoint to OpenTSDB to be viewable on grafana dashboard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment