Skip to content

Instantly share code, notes, and snippets.

@cgswong
Last active August 29, 2015 14:23
Show Gist options
  • Save cgswong/9bcf609d445494fd1a35 to your computer and use it in GitHub Desktop.
Save cgswong/9bcf609d445494fd1a35 to your computer and use it in GitHub Desktop.
Logstash message queue config
# #####################################################################
# DESC: Logstash configuration file. Typically forwarding logs to
# Elasticsearch instance.
# #####################################################################
# Where to get input
input {
# Get input from Redis queue
redis {
data_type => "list"
host => "127.0.0.1"
key => "logstash"
port => 6379
tags => ["redis"]
}
# Get input from Kafka queue
kafka {
consumer_threads => 1
consumer_restart_sleep_ms => 100
decorate_events => true
group_id => "logs"
topic_id => "logstash"
zk_connect => ["localhost:2181"]
}
}
# Some Filtering
filter {
# SYSLOG filter
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "message", "%{syslog_message}" ]
}
mutate {
remove_field => [ "syslog_message" ]
}
}
# Remove spurious fields that have names changed or been aggregated
mutate {
remove_field => [ "syslog_hostname", "syslog_timestamp" ]
}
}
# Apache Access Log filter
if [type] == "apache-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
# Nginx Access Log filter
if [type] == "nginx-access" {
grok {
match => { "message" => "%{NGINXACESS}" }
}
}
# Tomcat filter
if [type] == "tomcat" and [message] !~ /(.+)/ {
drop { }
}
# Docker filter
if [type] == "docker" {
json {
source => "message"
}
mutate {
rename => [ "log", "message" ]
}
date {
match => [ "time", "ISO8601" ]
}
}
}
# Where to send output
output {
# Send output to standard output device/interface
stdout {
codec => rubydebug
}
# Parse failed syslog messages
if [type] == "syslog" and "_grokparsefailure" in [tags] {
file { path => "/var/log/failed_syslog_events-%{+YYYY-MM-dd}" }
}
# Send output to Elasticsearch over HTTP interface.
elasticsearch {
protocol => 'http'
host => "localhost:9200"
}
# Send output to Kafka topic
kafka {
codec => plain {
format => "%{message}"
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment