Skip to content

Instantly share code, notes, and snippets.

@tjws052009
Last active August 3, 2021 02:20
Show Gist options
  • Save tjws052009/8576b2efeeac3ce86a551ef5a5a534a7 to your computer and use it in GitHub Desktop.
Save tjws052009/8576b2efeeac3ce86a551ef5a5a534a7 to your computer and use it in GitHub Desktop.
A bit of EQL

A little bit of EQL

Notes on environment

  • 7.12.0

Resources

Steps

1. Load Kibana sample web logs

2. Get top ips to use as reference

GET kibana_sample_data_logs/_search
{
  "size": 0,
  "aggs": {
    "listip": {
      "terms": {
        "field": "ip",
        "size": 10
      }
    }
  }
}

# response
{
  ...
   "aggregations" : {
    "listip" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 13732,
      "buckets" : [
        {
          "key" : "30.156.16.163",
          "doc_count" : 117
        },
        {
          "key" : "164.85.94.243",
          "doc_count" : 29
        },
        {
          "key" : "50.184.59.162",
          "doc_count" : 26
        },
  ...
}

I used the top result, "30.156.16.163".

3. Create another index with the same mapping as the sample web logs

GET kibana_sample_data_logs/_mappings

PUT kibana_app_logs
{
  "mappings": {
    "properties" : {
      "@timestamp" : {
        "type" : "alias",
        "path" : "timestamp"
      },
      "agent" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "bytes" : {
        "type" : "long"
      },
      "clientip" : {
        "type" : "ip"
      },
      "event" : {
        "properties" : {
          "dataset" : {
            "type" : "keyword"
          }
        }
      },
      "extension" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "geo" : {
        "properties" : {
          "coordinates" : {
            "type" : "geo_point"
          },
          "dest" : {
            "type" : "keyword"
          },
          "src" : {
            "type" : "keyword"
          },
          "srcdest" : {
            "type" : "keyword"
          }
        }
      },
      "host" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "index" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "ip" : {
        "type" : "ip"
      },
      "machine" : {
        "properties" : {
          "os" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "ram" : {
            "type" : "long"
          }
        }
      },
      "memory" : {
        "type" : "double"
      },
      "message" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "phpmemory" : {
        "type" : "long"
      },
      "referer" : {
        "type" : "keyword"
      },
      "request" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "response" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "tags" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "timestamp" : {
        "type" : "date"
      },
      "url" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "utc_time" : {
        "type" : "date"
      }
    }
  }
}

4. Sample 3 of the documents from the kibana sample data and add it to the newly created index, but make a few changes to some of the fields (timestamp, machine.os and event.dataset in my case)

  • I updated the timestamp to make sure these events occur sequentially (30 mins)
  • I updated the machine os so that I can test how to query changes in a field
  • I updated event.dataset to let the index have a different event category. (I will use event.dataset as the event category field in EQL)
POST _reindex?max_docs=3
{
  "source": {
    "index": "kibana_sample_data_logs",
    "query": {
      "term": {
        "ip": {
          "value": "30.156.16.163"
        }
      }
    }
  },
  "dest": {
    "index": "kibana_app_logs"
  },
  "script": {
    "lang": "painless", 
    "source": """
      ctx._id = java.util.UUID.randomUUID().toString();
      ctx._source.event.dataset = "app_logs";
      ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(15);
      ctx._source.machine.os = "win xp"
    """
  }
}

POST _reindex?max_docs=3
{
  "source": {
    "index": "kibana_sample_data_logs",
    "query": {
      "term": {
        "ip": {
          "value": "30.156.16.163"
        }
      }
    }
  },
  "dest": {
    "index": "kibana_app_logs"
  },
  "script": {
    "lang": "painless", 
    "source": """
      ctx._id = java.util.UUID.randomUUID().toString();
      ctx._source.event.dataset = "app_logs";
      ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(30);
      ctx._source.machine.os = "win 7"
    """
  }
}

POST _reindex?max_docs=3
{
  "source": {
    "index": "kibana_sample_data_logs",
    "query": {
      "term": {
        "ip": {
          "value": "30.156.16.163"
        }
      }
    }
  },
  "dest": {
    "index": "kibana_app_logs"
  },
  "script": {
    "lang": "painless", 
    "source": """
      ctx._id = java.util.UUID.randomUUID().toString();
      ctx._source.event.dataset = "app_logs";
      ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(45);
      ctx._source.machine.os = "win 8"
    """
  }
}

5. Run some EQL on both fields

One thing that helped me understand EQL better is understanding what the event category field is. This is a keyword field that is used throughout the EQL. In the EQL syntax xxxx where yyyy == "zzzz", the xxxx corresponds to the event category field and defaults to event.category if not specified.

Query any two events that happen sequentially in a 1 hour span, but in the order of

  • event.dataset == "sample_web_logs"
  • event.dataset == "app_logs"

=> This should return 3 results.

GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
  "event_category_field": "event.dataset", 
  "query": """
    sequence by ip with maxspan = 1h
      [sample_web_logs where true]
      [app_logs where true]
  """
}

Same as above, but now in a time span of 10mins.

=> This should return no results.

GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
  "event_category_field": "event.dataset", 
  "query": """
    sequence by ip with maxspan = 10m
      [sample_web_logs where true]
      [app_logs where true]
  """
}

Same as the first EQL, but now asking for the order of the events to be the opposite

  • event.dataset == "app_logs"
  • event.dataset == "sample_web_logs"

=> Given that my sample data were all more then 1hours away, this should return no results.

GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
  "event_category_field": "event.dataset", 
  "query": """
    sequence by ip with maxspan = 1h
      [app_logs where true]
      [sample_web_logs where true]
  """
}

Now I query for 2 events in sequence over a 1hour period in the following order with these conditions

  • event.dataset == "sample_web_logs" && machines.os == "win xp"
  • event.dataset == "app_logs"

=> My 3 original sample data had 1 document with "win xp", 1 document with "win 8", and 1 document with "win 7". Therefore, should return 1 result, the first event should be the one with machine.os == "win xp"

GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
  "event_category_field": "event.dataset", 
  "query": """
    sequence by ip with maxspan = 1h
      [sample_web_logs where machine.os == "win xp"]
      [app_logs where true]
  """
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment