Skip to content

Instantly share code, notes, and snippets.

@myouju
Last active February 25, 2016 09:10
Show Gist options
  • Save myouju/3c1757425a239c00179b to your computer and use it in GitHub Desktop.
Save myouju/3c1757425a239c00179b to your computer and use it in GitHub Desktop.
apacheログをfluentdで動的に型変換してelasticsearchにデータを入れる
{
"template": "es-*",
"mappings": {
"fluentd":{
"properties":{
"geo_location":{
"type": "geo_point"
},
"all_path":{
"type": "string",
"index": "analyzed"
},
"path":{
"type": "string",
"index": "analyzed"
}
},
"dynamic_templates" : [ {
"string_template" : {
"mapping" : {
"index" : "not_analyzed",
"type" : "string"
},
"match_mapping_type" : "string",
"match" : "*"
}
} ]
}
}
}
require 'json'
def integer_string?(is)
Integer(is)
true
rescue Exception
false
end
def float_string?(fs)
Float(fs)
true
rescue Exception
false
end
while str = STDIN.gets
begin
str.chomp!
s = JSON.parse(str)
h = {}
unless s['parameter'].empty? then
s['parameter'].slice!(0)
s['parameter'].split('&').each{|s_tmp|
tmp = s_tmp.split('=')
h[tmp[0]] = tmp[1]
if tmp[0] == "start_date" || tmp[0] == "end_date" then
h[tmp[0]] = tmp[1] + " "
end
}
end
s['parameter'] = h
s.each {|key,value|
s[key] = value == "-" ? nil : value
if value.nil? then
elsif integer_string?(value) then
s[key] = value.to_i
elsif float_string?(value) then
s[key] = value.to_f
end
}
print s.to_json
STDOUT.flush
rescue Exception => e
#res = "#{e} row: #{str}"
#exec("echo '#{res}' >> /var/log/td-agent/error.log")
STDERR.puts "#{e} row: #{str}"
STDERR.flush
end
end
<source>
type tail
path /var/log/apache2/access_ltsv.log
tag apache.filter
format ltsv
pos_file /var/log/td-agent/access_ltsv.pos
time_key request_time
time_format %Y-%m-%d %H:%M:%S %z
</source>
<match py.filter>
type exec_filter
command /opt/td-agent/embedded/bin/ruby /etc/td-agent/parse.rb
in_format json
out_format json
tag geo.apache
time_key request_time
time_format %Y-%m-%d %H:%M:%S %z
buffer_type file
buffer_path /var/log/td-agent/buffer/filter.access
flush_interval 10
<secondary>
type file
path /tmp/filter/error.log
</secondary>
</match>
<match geo.apache>
type geoip
geoip_lookup_key remote_host
<record>
country_name ${country_name["remote_host"]}
city ${city["remote_host"]}
geo_location '{ "lat" : ${latitude["remote_host"]}, "lon" : ${longitude["remote_host"]} }'
</record>
skip_adding_null_record true
flush_interval 5s
remove_tag_prefix geo.
</match>
<match apache>
type copy
<store>
type elasticsearch
include_tag_key true
tag_key @log_name
host <amazon es host>
port 80
logstash_format true
logstash_prefix es
buffer_type file
buffer_path /var/log/td-agent/buffer/es.jp
flush_interval 10
reload_connections false
</store>
</match>
<match **>
type stdout
</match>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment