Skip to content

Instantly share code, notes, and snippets.

@Lokey92
Last active December 8, 2021 14:58
Show Gist options
  • Save Lokey92/d237a7ac3c129e1455425a83197206c8 to your computer and use it in GitHub Desktop.
Save Lokey92/d237a7ac3c129e1455425a83197206c8 to your computer and use it in GitHub Desktop.
Elastic Consulting - Using watcher to alert on transform outages.
# Scenario: An Elastic stack developer built a transform and would like to monitor potential failures on it automatically using built-in alert features.
# Solution: Using an advanced watch can periodically pass an API request to tell if a transform is down.
# NOTE: This tutorial assumes the user knows how to create a transform, but there is an example available in the next steps to follow along if not.
# To use this tutorial easily, copy and paste the contents of this file into Dev Tools (Found under Management in the Kibana sidebar).
# More information on Elastic Watcher/Transform features may be found at the below links:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html
# https://www.elastic.co/guide/en/kibana/current/watcher-ui.html
# For this tutorial, I created a simple continuous transform that uses the logs-* index (standard data stream).
# Running the below should return a 'true' acknowledgement in Dev Tools.
# If there's another transform in mind, ignore this and use that instead.
PUT _transform/test-watcher-alert
{
"source": {
"index": [
"logs-*"
]
},
"latest": {
"unique_key": [
"host.ip",
"host.name"
],
"sort": "@timestamp"
},
"description": "Baseline used in a Gist tutorial.",
"frequency": "1m",
"dest": {
"index": "test-watcher-alert"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"retention_policy": {
"time": {
"field": "@timestamp",
"max_age": "7d"
}
},
"settings": {
"max_page_search_size": 500
}
}
# After setting the transform, the stats may be checked by using the below API:
GET _transform/test/_stats
# In the return there's several metrics that can be used to determine if the transform is functioning or not.
# This is how the return looks:
{
"count" : 1,
"transforms" : [
{
"id" : "test",
"state" : "started",
"stats" : {
"pages_processed" : 17070,
"documents_processed" : 2510733,
"documents_indexed" : 51255,
"documents_deleted" : 17,
"trigger_count" : 29353,
"index_time_in_ms" : 114520,
"index_total" : 8535,
"index_failures" : 0,
"search_time_in_ms" : 164233,
"search_total" : 17070,
"search_failures" : 0,
"processing_time_in_ms" : 370,
"processing_total" : 17070,
"delete_time_in_ms" : 20184,
"exponential_avg_checkpoint_duration_ms" : 99.71199383696583,
"exponential_avg_documents_indexed" : 5.999999999999996,
"exponential_avg_documents_processed" : 728.9774102464496
},
"checkpointing" : {
"last" : {
"checkpoint" : 8535,
"timestamp_millis" : 1638831269341,
"time_upper_bound_millis" : 1638831209341
},
"operations_behind" : 13848,
"changes_last_detected_at" : 1638831269341
}
}
]
}
# For the present use case, it is pertinent to focus on the 'state' field from stats. This is where a user can quickly determine if there's issues.
# There are three 'healthy' states to keep in mind: started, starting and indexing.
# There are three 'unhealthy' states as well: aborting, stopping, stopped
# Now it's time to build an API using the above assumptions to send an alert when the host is 'unhealthy.'
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-stats.html#get-transform-stats)
# Starting out with watches, it's wise to simulate them first using a 'simple' input and a conditional transform. Below are notes on the components used in the simulated watch:
# 'trigger': I set a standard trigger to make it run every minute, this isn't a concern in a simulation but will be needed when actually creating the transform.
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/trigger-schedule.html)
# 'input.simple': The return from the previous stats API request was placed in it's entirety under the simple input, including the count.
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/input-simple.html)
# 'transform': A simple painless conditional statement to check if the watch is healthy. The logic is "IF 'state' IS NOT 'started' AND 'starting' AND 'indexing' THEN return true;
# That leaves only the 'unhealthy' states to return a 'true', which triggers the alert.
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/transform-script.html)
# One last note on the transform, you may notice it references the fields like 'ctx.payload.transforms[0].state'. Being familiar with painless context is important.
# The point of the transforms[0] is to indicate that the transforms field is an object, this is indicated by the encapsulating brackets ( "transforms": [ ... ] ).
# It is required to reference transforms as an object in order to retrieve the nested fields.
# (ref: https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-watcher-transform-context.html)
# 'action': Performs a simple logging action if the transform condition is met. In this case it uses the payload from the transform.
# Since the transform is a boolean conditional statement, the log output will only return 'true' or 'false'...
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/actions-logging.html)
# With the above in mind, try running this command to see the response. Note that the state in this example is set to 'stopped':
POST _watcher/watch/_execute?filter_path=**.result.actions.logging
{
"watch": {
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"simple": {
"count": 1,
"transforms": [
{
"id": "test",
"state": "stopped",
"stats": {
"pages_processed": 17070,
"documents_processed": 2510733,
"documents_indexed": 51255,
"documents_deleted": 17,
"trigger_count": 29353,
"index_time_in_ms": 114520,
"index_total": 8535,
"index_failures": 0,
"search_time_in_ms": 164233,
"search_total": 17070,
"search_failures": 0,
"processing_time_in_ms": 370,
"processing_total": 17070,
"delete_time_in_ms": 20184,
"exponential_avg_checkpoint_duration_ms": 99.71199383696583,
"exponential_avg_documents_indexed": 5.999999999999996,
"exponential_avg_documents_processed": 728.9774102464496
},
"checkpointing": {
"last": {
"checkpoint": 8535,
"timestamp_millis": 1638831269341,
"time_upper_bound_millis": 1638831209341
},
"operations_behind": 13848,
"changes_last_detected_at": 1638831269341
}
}
]
}
},
"transform": {
"script": """
return ctx.payload.transforms[0].state != 'started' && ctx.payload.transforms[0].state != 'starting' && ctx.payload.transforms[0].state != 'indexing'
"""
},
"actions": {
"my_logger": {
"logging": {
"text": "Watch payload = {{ctx.payload}}",
"level": "debug"
}
}
}
}
}
# Since the example was set to 'stopped', it returns a true value to indicate that the alert was triggered.
# This is a good baseline, now it's time to flesh out the idea.
{
"watch_record" : {
"result" : {
"actions" : [
{
"logging" : {
"logged_text" : "Watch payload = {_value=true}"
}
}
]
}
}
}
# For the next few steps, if the transform created is still running make sure it's stopped to simulate a response.
(ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/stop-transform.html)
POST _transform/test-watcher-alert/_stop
# Now the basic idea is down, it's time to use the actual transform API request. To do this an http input is required, replacing the simple input.simple.
# The http input passes a REST API request to the Elasticsearch host and returns a response.
# For this example, users will need to change the following fields:
# 'input.http.host' - set to the elasticsearch endpoint hostname
# 'input.http.port' - set to elasticsearch endpoint port
# 'input.http.basic.username' - this example uses basic REST API authorization, I used the 'elastic' superuser account and password for this example.
# 'input.http.basic.password' - The password of the username used.
# 'input.http.path' - Set this to the id of the transform, e.g. my example transform id is 'test-watcher-alert' which would make the path '_transform/test-watcher-alert/_stats'
(ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/input-http.html)
# Another change made from the last iteration is using the 'condition' component over 'transform'...
# It works in the exact same way as last, the reason why I swithced it up will be made clear in the next few steps.
(ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/condition-script.html)
POST _watcher/watch/_execute?filter_path=**.result.actions.logging
{
"watch": {
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"http": {
"request": {
"scheme": "https",
"host": "ecs-fleet-server-test.es.us-east-1.aws.found.io",
"port": 9243,
"method": "get",
"path": "_transform/test-watcher-alert/_stats",
"params": {},
"auth": {
"basic": {
"username": "elastic",
"password": "testpass1234"
}
},
"headers": {}
}
}
},
"condition": {
"script": {
"source": """
return ctx.payload.transforms[0].state != 'started' && ctx.payload.transforms[0].state != 'starting' && ctx.payload.transforms[0].state != 'indexing'
""",
"lang": "painless"
}
},
"actions": {
"my_logger": {
"logging": {
"text": "Watch payload = {{ctx.payload}}",
"level": "debug"
}
}
}
}
}
# Running this command with the proper api settings should return something along the lines of this, now it looks like a REST API return:
{
"watch_record" : {
"result" : {
"actions" : [
{
"logging" : {
"logged_text" : "Watch payload = {_headers={x-elastic-product=[Elasticsearch], date=[Tue, 07 Dec 2021 23:36:19 GMT], x-cloud-request-id=[wDmo-LgNTN6k0B-mKmyZVA], content-type=[application/json; charset=UTF-8], x-found-handling-cluster=[e72a51735a57434bae753ff90d3003a0], x-found-handling-instance=[instance-0000000001]}, count=1, transforms=[{id=test, state=stopped, stats={pages_processed=17070, documents_processed=2510733, documents_indexed=51255, documents_deleted=17, trigger_count=29353, index_time_in_ms=114520, index_total=8535, index_failures=0, search_time_in_ms=164233, search_total=17070, search_failures=0, processing_time_in_ms=370, processing_total=17070, delete_time_in_ms=20184, exponential_avg_checkpoint_duration_ms=99.71199383696583, exponential_avg_documents_indexed=5.999999999999996, exponential_avg_documents_processed=728.9774102464496}, checkpointing={last={checkpoint=8535, timestamp_millis=1638831269341, time_upper_bound_millis=1638831209341}, operations_behind=13951, changes_last_detected_at=1638831269341}}], _status_code=200}"
}
}
]
}
}
}
# A bit different than using the simple ingest method, but no real change to the methodology. Most of the extra stuff shown is header fluff that wont be used.
# Now to add the transform back, but this time it's not used as a conditional statement but a way of delivering payloads to the alert output.
# In the transform I created two payload fields. These fields are then referenced in the action output:
# 'result' - retrieves the JSON object embedded in the http return and puts it into its own payload. This is to get rid of the [0] object reference in actions.
# 'time' - The second field is more for flair than anything else. I used the ctx.trigger.triggered_time metadata which gives the UTC formatted time of when the alert was last triggered.
# In the painless script context I tacked on a couple of modifiers to set the time zone to MST, then changed it to a better 'human' format.
(ref: https://www.elastic.co/guide/en/elasticsearch/painless/master/painless-datetime.html#_datetime_zone)
(ref: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html)
# SIDE NOTE: The return on this transform is in brackets [ ... ] to allow for multiple payload outputs to a list-type object. Keep that in mind for future reference if multiple outputs/payloads are needed for a message.
# In the action I created a simple message output using the transform payloads, like so:
POST _watcher/watch/_execute?filter_path=**.result.actions.logging
{
"watch": {
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"http": {
"request": {
"scheme": "https",
"host": "ecs-fleet-server-test.es.us-east-1.aws.found.io",
"port": 9243,
"method": "get",
"path": "_transform/test-watcher-alert/_stats",
"params": {},
"auth": {
"basic": {
"username": "elastic",
"password": "testpass1234"
}
},
"headers": {}
}
}
},
"condition": {
"script": {
"source": """
return ctx.payload.transforms[0].state != 'started' && ctx.payload.transforms[0].state != 'starting'
""",
"lang": "painless"
}
},
"transform": {
"script": """
return [
'result' : ctx.payload.transforms[0],
'time': ctx.trigger.triggered_time.withZoneSameInstant(ZoneId.of('America/Denver')).format(DateTimeFormatter.RFC_1123_DATE_TIME)
]
"""
},
"actions": {
"my_logger": {
"logging": {
"text": "ALERT: Transform ID '{{ctx.payload.result.id}}' is in state '{{ctx.payload.result.state}}' as of {{ctx.payload.time}}",
"level": "error"
}
}
}
}
}
# This is the logging result, notice how it formatted:
"watch_record" : {
"result" : {
"actions" : [
{
"logging" : {
"logged_text" : "ALERT: Transform ID 'test' is in state 'stopped' as of Tue, 7 Dec 2021 16:39:14 -0700"
}
}
]
}
}
}
# If you want to put this alert in action, use the below API request.
# (ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-put-watch.html)
PUT _watcher/watch/test-transform-alert
{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"http": {
"request": {
"scheme": "https",
"host": "ecs-fleet-server-test.es.us-east-1.aws.found.io",
"port": 9243,
"method": "get",
"path": "_transform/test/_stats",
"params": {},
"auth": {
"basic": {
"username": "elastic",
"password": "testpass1234"
}
},
"headers": {}
}
}
},
"condition": {
"script": {
"source": """
return ctx.payload.transforms[0].state != 'started' && ctx.payload.transforms[0].state != 'starting'
""",
"lang": "painless"
}
},
"transform": {
"script": """
return [
'result' : ctx.payload.transforms[0],
'time': ctx.trigger.triggered_time.withZoneSameInstant(ZoneId.of('America/Denver')).format(DateTimeFormatter.RFC_1123_DATE_TIME)
]
"""
},
"actions": {
"my_logger": {
"logging": {
"text": "ALERT: Transform ID '{{ctx.payload.result.id}}' is in state '{{ctx.payload.result.state}}' as of {{ctx.payload.time}}",
"level": "error"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment