- 7.12.0
GET kibana_sample_data_logs/_search
{
"size": 0,
"aggs": {
"listip": {
"terms": {
"field": "ip",
"size": 10
}
}
}
}
# response
{
...
"aggregations" : {
"listip" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 13732,
"buckets" : [
{
"key" : "30.156.16.163",
"doc_count" : 117
},
{
"key" : "164.85.94.243",
"doc_count" : 29
},
{
"key" : "50.184.59.162",
"doc_count" : 26
},
...
}
I used the top result, "30.156.16.163"
.
GET kibana_sample_data_logs/_mappings
PUT kibana_app_logs
{
"mappings": {
"properties" : {
"@timestamp" : {
"type" : "alias",
"path" : "timestamp"
},
"agent" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"bytes" : {
"type" : "long"
},
"clientip" : {
"type" : "ip"
},
"event" : {
"properties" : {
"dataset" : {
"type" : "keyword"
}
}
},
"extension" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"geo" : {
"properties" : {
"coordinates" : {
"type" : "geo_point"
},
"dest" : {
"type" : "keyword"
},
"src" : {
"type" : "keyword"
},
"srcdest" : {
"type" : "keyword"
}
}
},
"host" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"index" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"ip" : {
"type" : "ip"
},
"machine" : {
"properties" : {
"os" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"ram" : {
"type" : "long"
}
}
},
"memory" : {
"type" : "double"
},
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"phpmemory" : {
"type" : "long"
},
"referer" : {
"type" : "keyword"
},
"request" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"response" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"tags" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"timestamp" : {
"type" : "date"
},
"url" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"utc_time" : {
"type" : "date"
}
}
}
}
4. Sample 3 of the documents from the kibana sample data and add it to the newly created index, but make a few changes to some of the fields (timestamp, machine.os and event.dataset in my case)
- I updated the
timestamp
to make sure these events occur sequentially (30 mins) - I updated the
machine os
so that I can test how to query changes in a field - I updated
event.dataset
to let the index have a different event category. (I will useevent.dataset
as the event category field in EQL)
POST _reindex?max_docs=3
{
"source": {
"index": "kibana_sample_data_logs",
"query": {
"term": {
"ip": {
"value": "30.156.16.163"
}
}
}
},
"dest": {
"index": "kibana_app_logs"
},
"script": {
"lang": "painless",
"source": """
ctx._id = java.util.UUID.randomUUID().toString();
ctx._source.event.dataset = "app_logs";
ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(15);
ctx._source.machine.os = "win xp"
"""
}
}
POST _reindex?max_docs=3
{
"source": {
"index": "kibana_sample_data_logs",
"query": {
"term": {
"ip": {
"value": "30.156.16.163"
}
}
}
},
"dest": {
"index": "kibana_app_logs"
},
"script": {
"lang": "painless",
"source": """
ctx._id = java.util.UUID.randomUUID().toString();
ctx._source.event.dataset = "app_logs";
ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(30);
ctx._source.machine.os = "win 7"
"""
}
}
POST _reindex?max_docs=3
{
"source": {
"index": "kibana_sample_data_logs",
"query": {
"term": {
"ip": {
"value": "30.156.16.163"
}
}
}
},
"dest": {
"index": "kibana_app_logs"
},
"script": {
"lang": "painless",
"source": """
ctx._id = java.util.UUID.randomUUID().toString();
ctx._source.event.dataset = "app_logs";
ctx._source.timestamp = ZonedDateTime.parse(ctx._source.timestamp).plusMinutes(45);
ctx._source.machine.os = "win 8"
"""
}
}
One thing that helped me understand EQL better is understanding what the event category field
is. This is a keyword
field that is used throughout the EQL. In the EQL syntax xxxx where yyyy == "zzzz"
, the xxxx
corresponds to the event category field
and defaults to event.category
if not specified.
Query any two events that happen sequentially in a 1 hour span, but in the order of
- event.dataset == "sample_web_logs"
- event.dataset == "app_logs"
=> This should return 3 results.
GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
"event_category_field": "event.dataset",
"query": """
sequence by ip with maxspan = 1h
[sample_web_logs where true]
[app_logs where true]
"""
}
Same as above, but now in a time span of 10mins.
=> This should return no results.
GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
"event_category_field": "event.dataset",
"query": """
sequence by ip with maxspan = 10m
[sample_web_logs where true]
[app_logs where true]
"""
}
Same as the first EQL, but now asking for the order of the events to be the opposite
- event.dataset == "app_logs"
- event.dataset == "sample_web_logs"
=> Given that my sample data were all more then 1hours away, this should return no results.
GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
"event_category_field": "event.dataset",
"query": """
sequence by ip with maxspan = 1h
[app_logs where true]
[sample_web_logs where true]
"""
}
Now I query for 2 events in sequence over a 1hour period in the following order with these conditions
- event.dataset == "sample_web_logs" && machines.os == "win xp"
- event.dataset == "app_logs"
=> My 3 original sample data had 1 document with "win xp", 1 document with "win 8", and 1 document with "win 7". Therefore, should return 1 result, the first event should be the one with machine.os == "win xp"
GET kibana_sample_data_logs,kibana_app_logs/_eql/search
{
"event_category_field": "event.dataset",
"query": """
sequence by ip with maxspan = 1h
[sample_web_logs where machine.os == "win xp"]
[app_logs where true]
"""
}