Work in progress
If you install Elasticsearch as-is the node will by default have the following roles:
- Data
- Master
- Ingest
These roles can then be de-coupled and run seperately, this of course means that you can scale the different node types to fit your needs.
List max_file_descriptors:
GET /_nodes/stats/process?filter_path=**.max_file_descriptors
Allocation API explaining more on shard allocation status:
GET /_cluster/allocation/explain
We'll use Artifactory logs throughout this guide
Pipelines pre-process documents before indexing, the Ingest node type in Elasticsearch includes a subset of Logstash functionality, part of that is the Ingest pipelines.
In the end of this guide i'll use this pipeline when shipping the log data with Filebeat. The reason for creating the pipeline in the first place was that when Filebeat read a log that it doesn't have a module for it'll send a JSON document containing the raw log entry in a message
field. We need to parse that field to extract the interesting parts that we'll use in dashboards and visualizations.
The examples below can be executed (copy-pasted) into Dev Tools within Kibana. The below example includes the pipeline and two example documents which contains faked log entries that we want to parse, the pipeline simulation API is a nice way of testing pipelines. This pipeline uses grok
, date
and remove
processors:
POST _ingest/pipeline/_simulate
{
"pipeline" : {
"description": "rtf access log pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{USERNAME:username}\/%{IP:client_ip}",
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{GREEDYDATA:other}",
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] for %{USERNAME:username}\/%{IP:client_ip}"
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "{{ _index }}"
}
}
]
}
},
{
"date": {
"field" : "artifactory_timestamp",
"target_field": "@timestamp",
"formats" : [ "YYYY-MM-dd HH:mm:ss,SSS" ],
"timezone" : "Europe/Stockholm",
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "{{ _index }}"
}
}
]
}
},
{
"remove": {
"field": "message"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "_doc",
"_id": "1",
"_source": {
"message": "2018-09-06 08:17:31,105 [ACCEPTED DOWNLOAD] pip-remote-cache:0f/14/e3112808b727f72df9531fc2f00b84d4966e66001748b6883a21c767e902/smmap2-2.0.4-py2.py3-none-any.whl for anonymous/192.168.1.10."
}
},
{
"_index": "index",
"_type": "_doc",
"_id": "2",
"_source": {
"message": "2018-09-05 12:00:15,178 [ACCEPTED DELETE] auto-trashcan:something.local/a/b/c/file.jar for _system_."
}
},
{
"_index": "index",
"_type": "_doc",
"_id": "3",
"_source": {
"message": "2018-09-10 15:27:10,330 [DENIED LOGIN] for userid1234/1.2.3.4."
}
}
]
}
So what's going on within the pipeline block above?
- The
grok
processor have two different patterns that will be used when parsing the incoming data, if any of the patterns matches the document will be indexed accordingly. If the matching fails for some reason the error message will be stored in another index with the namefailed-filebeat-2018.09.10
(as an example), this way it's easy to keep track of errors and add e.g. alerting when parsing fails. - The
date
processor will use theartifactory_timestamp
and set it as the timestamp for the indexed document. If this parsing fails, as with thegrok
processor, we sent the error message to the error-index. - The
remove
processor removes themessage
field that contains the raw log entry, we don't need to index this since we've successfully parsed it.
Example of a resulting document, which would be indexed in Elasticsearch now looks like this:
{
"docs": [
{
"doc": {
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"artifactory_timestamp": "2018-09-06 08:17:31,105",
"file_path": "0f/14/e3112808b727f72df9531fc2f00b84d4966e66001748b6883a21c767e902/smmap2-2.0.4-py2.py3-none-any.whl",
"action_response": "ACCEPTED",
"@timestamp": "2018-09-06T08:17:31.105+02:00",
"action_type": "DOWNLOAD",
"repo_name": "pip-remote-cache",
"client_ip": "192.168.1.10",
"username": "anonymous"
},
"_ingest": {
"timestamp": "2018-09-06T20:53:22.588333Z"
}
}
}
]
}
To create the pipeline to start using it in Elasticsearch you'll need to create it, this was done through Dev Tools in Kibana:
PUT _ingest/pipeline/rtf-access-log-pipeline
{
"description": "rtf access log pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{USERNAME:username}\/%{IP:client_ip}",
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{GREEDYDATA:other}",
"%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] for %{USERNAME:username}\/%{IP:client_ip}"
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "{{ _index }}"
}
}
]
}
},
{
"date": {
"field" : "artifactory_timestamp",
"target_field": "@timestamp",
"formats" : [ "YYYY-MM-dd HH:mm:ss,SSS" ],
"timezone" : "Europe/Stockholm",
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "{{ _index }}"
}
}
]
}
},
{
"remove": {
"field": "message"
}
}
]
}
To start sending some real data into Elasticsearch from the Artifactory access.log
, now that we have a pipeline ready to parse the log entries, i did the following:
- Copied the
access.log
(latest log) from the Artifactory logs directory - Created a directory to keep all relevant files in the same place
- Created a bare minimum
filebeat.yml
configuration file. Note that we add anpipeline
parameter with the name of the pipeline we added in the previous section:
filebeat.inputs:
- type: log
enabled: true
paths:
- /logs/access.log
pipeline: "rtf-access-log-pipeline"
setup.template.enabled: false
output.elasticsearch:
enabled: true
hosts: ["HOSTNAME_OR_IP_OF_ELASTICSEARCH:9200"]
- Next we run the following command from the working directory containing our log and configuration file:
docker run --rm -v $PWD/filebeat.yml:/usr/share/filebeat/filebeat.yml -v $PWD/access.log:/logs/access.log docker.elastic.co/beats/filebeat:6.4.0
When running the container Filebeat will parse the whole log through the input and when sending this data to Elasticsearch it will be parsed with the provided pipeline
. If you re-run this container it'll do the same procedure again, this is quite nice for back-filling Elasticsearch with log data.
When you'll run Filebeat to send live logs there's good to know that there's a state file that is used internally to keep track of new log entries.
TODO:
- Fix the
grok
pattern so that it works for all kinds of messages in the access.log - Test the
stdin
input of Filebeat - Give the parsed fields searchable and descriptive names e.g.
artifactory.repo_name