Created
November 12, 2011 10:14
-
-
Save doryokujin/1360341 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<source> | |
type tail | |
format /^(?<time>[^ ]* [^ ]*[ ]*[^ ]* [^ ]*) \[(?<type>[^\]]*)\] (?<verb>[^ ]*) (?<object>[^ ]*) (?<message>.*)$/ | |
time_format %a %b %e %H:%M:%S | |
path /log/mongo/delta6/shard05/mongo.log,/log/mongo/delta6/shard11/mongo.log,/log/mongo/delta6/shard17/mongo.log,/log/mongo/delta6/shard23/mongo.log | |
tag mongo | |
</source> | |
<match mongo**> | |
type file | |
time_slice_wait 1m | |
buffer_path /home/doryokujin/buffer.log | |
database test | |
collection capped | |
host localhost | |
port 27017 | |
<metrics> | |
name metrics_test | |
partition_time s | |
partition_key type | |
key $type,$verb,$object | |
# value 1 | |
# float | |
</metrics> | |
</match> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class BasicMetricsOutput < TimeSlicedOutput | |
def initialize | |
super | |
require 'msgpack' | |
require 'time' | |
@metrics = [] | |
end | |
def configure(conf) | |
super | |
conf.elements.select {|e| | |
e.name == 'metrics' | |
}.each {|e| | |
add_metrics(e) | |
} | |
end | |
def add_metrics(conf) | |
m = Metrics.new | |
m.configure(conf) | |
@metrics << m | |
end | |
class Metrics | |
def initialize | |
@float = false | |
end | |
def configure(conf) | |
raise ConfigError, "'name' parameter is required on <metrics> directive" unless @name = conf['name'] | |
raise ConfigError,"'partition_key' parameter is required on <metrics> directive" unless @partition_key = conf['partition_key'] | |
raise ConfigError, "'key' parameter is required on <metrics> directive" unless key = conf['key'] | |
value = conf['value'] || 1.to_s | |
constant_key_name = conf['constant_key_name'] || "_constant_" | |
is_float = true if conf['float'] | |
@key_proc = BasicMetricsOutput.create_get_value_proc_combined(key, is_float, constant_key_name) | |
@value_proc = BasicMetricsOutput.create_get_value_proc_combined(value, is_float, constant_key_name) | |
partition_time = conf['partition_time'] || 'h' | |
case partition_time | |
when 'd' | |
time_format = "%Y-%m-%d" | |
when 'h' | |
time_format = "%Y-%m-%d %H" | |
when 'm' | |
time_format = "%Y-%m-%d %H:%M" | |
when 's' | |
time_format = "%Y-%m-%d %H:%M:%S" | |
else | |
raise ConfigError, "Unexpected partition_time parameter #{partition_time.dump}: expected d, h, m or s" | |
end | |
@partition_proc = Proc.new {|time,record| | |
Time.at(time).strftime(time_format) | |
} | |
end | |
def format(tag, time, record) | |
partition_time = @partition_proc.call(time, record) | |
key = @key_proc.call(record) | |
value = @value_proc.call(record) | |
if key && value | |
a = { | |
"time" => time, "name" => @name, | |
"partition" => { "key" => @partition_key, "time" => partition_time }, | |
"key" => key, | |
"value" => value | |
} | |
# puts a | |
a.to_msgpack | |
else | |
nil | |
end | |
end | |
end | |
def format(tag, time, record) | |
out = '' | |
@metrics.each {|m| | |
if data = m.format(tag, time, record) | |
out << data | |
end | |
} | |
out | |
end | |
private | |
def self.create_get_value_proc_combined(expr, is_float, constant_key_name) | |
list = expr.split(',').map {|e| e.strip } | |
procs = list.map {|e| | |
create_get_value_proc(e, is_float, constant_key_name) | |
} | |
Proc.new {|record| | |
map = Hash[ *procs.map {|pr| pr.call(record)}.flatten ] | |
if map.values().any? {|e| e == nil } | |
nil | |
else | |
map | |
end | |
} | |
end | |
def self.create_get_value_proc(expr, is_float, constant_key_name) | |
if expr =~ /\A([^\w])(.*)\Z/ | |
sym = $~[1] | |
key = $~[2] | |
case sym | |
when '$' | |
Proc.new {|record| | |
val = record[key] | |
if val =~ /^[+-]?\d+$/ | |
val = is_float ? val.to_f : val.to_i | |
elsif val =~ /^[+-]?\d+\.?\d*$/ | |
val = val.to_f | |
end | |
[key, val] | |
} | |
else | |
raise ConfigError, "Unknown metrics expression #{expr.dump}" | |
end | |
else | |
# constant | |
val = is_float ? expr.to_f : expr.to_i | |
Proc.new {|record| [ constant_key_name, val ] } | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class MongoMetricsOutput < BasicMetricsOutput | |
Plugin.register_output('mongo_metrics', self) | |
def initialize | |
super | |
require 'mongo' | |
end | |
def configure(conf) | |
super | |
# Mongo | |
raise ConfigError, "'database' parameter is required on MongoDump output" unless @database = conf['database'] | |
@collection = conf['collection'] | |
@use_common_coll = @collection.nil? ? false : true | |
@host = conf['host'] || 'localhost' | |
@port = conf['port'] || 27017 | |
if conf.has_key?('capped') | |
@collection_opts = { :capped => true } | |
@collectoin_opts[:size] = conf.has_key?('cap_size') ? Config.size_value(cap_size) : Config.size_value('1000m') | |
@collection_opts[:max] = Config.size_value(cap_size) if conf.has_key?('cap_max') | |
end | |
end | |
def start | |
@coll = @use_common_coll ? get_collection(@collection) : nil | |
super | |
end | |
def shutdown | |
super | |
@coll.db.connection.close | |
end | |
def write(chunk) | |
# record = { | |
# :time => time, | |
# :name => name, | |
# :partition => { :key => partition_key, :time => partition_time }, | |
# :key => key, | |
# :value => value | |
# } | |
chunk.open { |io| | |
begin | |
# :key => key, | |
# :value => value | |
# } | |
chunk.open { |io| | |
begin | |
MessagePack::Unpacker.new(io).each { |record| | |
p record.to_s | |
if @coll.nil? | |
@coll = get_collection(record["name"]) | |
elsif ! @use_common_coll and @coll.name != record["name"] | |
@coll = get_collection(record["name"]) | |
end | |
@coll.insert(record) | |
# Don't forget adding the option :upsert. | |
=begin | |
@coll.update({ | |
"partition_key" => record["partition"]["key"], | |
"partition_time"=> record["partition"]["time"] | |
}, { | |
"$inc" => { key => record["field"]["value"] } | |
}, { | |
:upsert => true, | |
:multi => true | |
}) | |
=end | |
} | |
rescue EOFError | |
# EOFError always occured when reached end of chunk. | |
end | |
} | |
end | |
private | |
def get_collection(collection) | |
db = Mongo::Connection.new(@host, @port).db(@database) | |
if db.collection_names.include?(collection) | |
return db.collection(collection) | |
else | |
return db.create_collection(collection, @collection_opts) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment