Last active September 1, 2021 19:41
input {
file {
path => "/path/to/exim/mainlog"
start_position => 'beginning'
sincedb_path => "/dev/null"
filter {
# Strip out lines we don't need using the regexp in the patterns file
if [message] =~ /%{EXIM_EXCLUDE_TERMS}/ {
drop { }
# Really, really dirty hack to workaround bug in grok code
# which won't handle multiple matches on the same field
mutate {
add_field => {
"message_1" => "%{message}"
"message_2" => "%{message}"
"message_3" => "%{message}"
"message_4" => "%{message}"
"message_5" => "%{message}"
grok {
patterns_dir => "/etc/logstash/patterns/"
break_on_match => false
keep_empty_captures => true
match => [
"message_1", "(%{SYSLOGBASE} )(%{EXIM_DATE:exim_date} )(%{EXIM_PID:exim_pid} )(%{EXIM_MSGID:exim_msg_id} )(%{EXIM_FLAGS:exim_flags} )(%{GREEDYDATA})"
match => [
"message_2", "(%{EXIM_MSGID} )(<= )(%{NOTSPACE:env_sender} )(%{EXIM_REMOTE_HOST} )?(%{EXIM_INTERFACE} )?(%{EXIM_PROTOCOL} )?(X=%{NOTSPACE:tls_info} )?(%{EXIM_MSG_SIZE} )?(%{EXIM_HEADER_ID} )?(%{EXIM_SUBJECT})"
match => [
"message_3", "(%{EXIM_MSGID} )([=-]> )(%{NOTSPACE:env_rcpt} )(<%{NOTSPACE:env_rcpt_outer}> )?(R=%{NOTSPACE:exim_router} )(T=%{NOTSPACE:exim_transport} )(%{EXIM_REMOTE_HOST} )(X=%{NOTSPACE:tls_info} )?(QT=%{EXIM_QT:exim_qt})"
match => [
"message_4", "(%{SYSLOGBASE} )(%{EXIM_DATE:exim_date} )(%{EXIM_PID:exim_pid} )(%{EXIM_MSGID:exim_msg_id} )(Completed )(QT=%{EXIM_QT:exim_qt})"
match => [
"message_5", "(%{SYSLOGBASE} )(%{EXIM_DATE:exim_date} )(%{EXIM_PID:exim_pid} )(%{EXIM_MSGID:exim_msg_id} )?(%{EXIM_REMOTE_HOST} )?(%EXIM_INTERFACE} )?(F=<%{NOTSPACE:env_sender}> )?(.+(rejected after DATA|rejected \(but fed to sa-learn\)|rejected [A-Z]+ (or [A-Z]+ %{NOTSPACE}?|<%{NOTSPACE:env_rcpt}>)?): (?<exim_rej_reason>.+))"
date {
match => [ "timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss", "ISO8601" ]
if "_grokparsefailure" in [tags] {
drop { }
mutate {
add_field => { "exim_msg_state" => "not_defined" }
if [exim_flags] == "<=" {
mutate {
update => [ "exim_msg_state", "received" ]
} else if [exim_flags] == "=>" {
mutate {
update => [ "exim_msg_state", "delivered" ]
} else if [exim_flags] == "->" {
mutate {
update => [ "exim_msg_state", "delivered" ]
} else if [exim_flags] == ">>" {
mutate {
update => [ "exim_msg_state", "cutthrough_delivery" ]
} else if [exim_flags] == "*>" {
mutate {
update => [ "exim_msg_state", "suppressed_delivery" ]
} else if [exim_flags] == "==" {
mutate {
update => [ "exim_msg_state", "deferred" ]
} else if [exim_flags] == "**" {
mutate {
update => [ "exim_msg_state", "failed" ]
} else if "Completed QT=" in [message] {
mutate {
update => [ "exim_msg_state", "completed" ]
} else if [message] =~ /(rejected after DATA|rejected \(but fed to sa-learn\))/ {
mutate {
update => [ "exim_msg_state", "rejected_after_data" ]
} else if " rejected " in [message] {
mutate {
update => [ "exim_msg_state", "rejected_smtp_transaction" ]
if [logsource] =~ /mx-/ {
mutate {
add_field => { "host_type" => "MX" }
} else if [logsource] =~ /mta-/ {
mutate {
add_field => { "host_type" => "MTA" }
# Ignore feeding sa-learn
if [message] =~ /R=feed_sa_learn T=feed_sa_learn/ {
drop { }
# Look back through ES table for existence of previous entry
# for this exim_msg_id being rejected & fed to Spam Assassin.
# If entry is found, drop this log line (it's irrelevant).
if [exim_msg_state] == "completed" and [host_type] == "MX" {
elasticsearch {
query => 'exim_msg_id:"%{exim_msg_id}" AND exim_msg_state:"rejected_after_data"'
fields => [ "exim_msg_state", "exim_msg_state2" ]
#sort => "@timestamp:desc, ignore_unmapped:true"
sort => "ignore_unmapped:true"
fail_on_error => "false"
if [exim_msg_state2] == "rejected_after_data" {
drop { }
mutate {
remove_field => [ "query_failed" ]
# Look back through ES table for the incoming message and
# extract the envelope sender. Permits later stats creation
# listing deliveries per env_sender
if [exim_msg_state] == "delivered" {
elasticsearch {
query => 'exim_msg_id:"%{exim_msg_id}" AND exim_msg_state:"received"'
#fields => [ "env_sender", "env_sender" ]
fields => [ "env_sender", "env_sender", "remote_host", "remote_host", "remote_hostname", "remote_hostname" ]
sort => "ignore_unmapped:true"
fail_on_error => "false"
mutate {
remove_field => [ "query_failed" ]
# Not interested in non-actioned retries
if [exim_flags] == "==" and "retry time not reached" in [message] {
drop { }
# Remove the really, really dirty hack to workaround bug in grok code
# which won't handle multiple matches on the same field
mutate {
remove_field => [ "message_1","message_2","message_3","message_4","message_5" ]
output {
elasticsearch {
host => localhost
index_type => "%{[exim_msg_state]}"
index => "exim-%{+YYYY.MM.dd}"
flush_size => 2
Copy link

mcnewton commented Jul 4, 2014


This wouldn't quite work on our logs; we don't currently include the exim timestamp in the syslog entry (probably should do so, but they're secondary to the mainlog...), or the queue time. I've patched the regexes to not need to include those if they aren't there. The result is here if it's helpful:

Thanks for publishing these - I've been thinking about trying logstash + friends for a while for exim, and for a complete ELK newcomer, this made it so much easier to begin!


Copy link

This one got me for way too long.

Line 12 isn't working on 1.4.2.
Grok patterns aren't available for the conditionals.

Copy link

@Temal, did you solve this issue? thanks

Copy link

@greem can you share the ideas on how to send this logs to remote logstash using logstash-forwarder?

especially this part:

output {
elasticsearch {
host => localhost
index_type => "%{[exim_msg_state]}"
index => "exim-%{+YYYY.MM.dd}"
flush_size => 2

