Created
October 4, 2011 06:39
-
-
Save doryokujin/1261025 to your computer and use it in GitHub Desktop.
NoSQLAggregatedOutput #fluent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# my host is host4 | |
<source> | |
type redis_dumper | |
tag reduce_redis | |
host host1 | |
port 6003 | |
delay 10 | |
time_suffix s | |
key key1,key2,key3 | |
value val1 | |
reducer_id 0 | |
reduce_num 2 | |
</source> | |
<match reduce**> | |
type redis_aggregator | |
host localhost | |
port 6003 | |
time_suffix s | |
key key1,key2,key3 | |
value val1 | |
</match> | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#(M): Mapper, (R): Reducer | |
*.log --> host1(M)-> | |
->(R)host4(M)-> | |
*.log --> host2(M)-> ->(R)host6 --> output | |
->(R)host5(M)-> | |
*.log --> host3(M)-> | |
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"b","key3":"c","event_time":"2s","route":"host3,host4,host6","fluent_counter":"2", "val1": "100"} | |
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"b","key3":"d","event_time":"2s","route":"host2,host5,host6","fluent_counter":"3", "val1": "10"} | |
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"e","key3":"c","event_time":"2s","route":"host1,host4,host6","fluent_counter":"4", "val1": "20"} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class NoSQLAggregatedOutput < Fluent::BufferedOutput | |
Fluent::Plugin.register_output('nosql_aggregator', self) | |
def initialize | |
super | |
require 'msgpack' | |
require 'date' | |
end | |
def configure(conf) | |
super | |
raise ConfigError, "'key' parameter is required on Redis Aggregator output" unless @key_list = conf['key'].split(',').sort | |
raise ConfigError, "'time_suffix' parameter is required on Redis Aggregator output" unless @time_suffix = conf['time_suffix'] | |
# Map to { @group_key => rounded_timestamp }. | |
@group_key = 'rounded_timestamp' | |
# Value list to aggregate, so each value must be integer. | |
@value_list = conf.has_key?('value') ? conf['value'].split(',') : [] | |
# Key separator. | |
@sep = conf['separator'] || "\34" | |
# NoSQL | |
@host = nil | |
@port = nil | |
# Use for recording which the record come. | |
@localhost = `hostname`.chop | |
# Default is 60, [OutputThread.flush_interval] | |
@writer.flush_interval = Fluent::Config.time_value("1"+@time_suffix) | |
end | |
def start | |
super | |
end | |
def shutdown | |
super | |
end | |
def format(tag, event) | |
record = event.record.dup | |
# | |
# Opts has special fields used internally and for debug. | |
# opts [str] :event_time is a time + @time_suffix of current process such as '3s', '30m', '14h' or '29d'. | |
# opts [int] :@group_key is a timestamp rounded by a unit below @time_suffix. | |
# opts [str] :@route means where the record come from. 'host1,host3,host5' means the route `host1->host3->host5`. | |
# | |
opts = {} | |
event_time, rounded_timestamp = get_time_parts(Time.at(event.time)) | |
opts['event_time'] = record.has_key?('event_time') ? record['event_time'] : event_time.to_s + @time_suffix | |
opts[@group_key] = record.has_key?(@group_key) ? record[@group_key] : rounded_timestamp | |
opts['route'] = record.has_key?('route') ? record['route']+','+@localhost : @localhost | |
# | |
# Key is a joined multiple map_key string by @sep, the 1st ans 2nd variables are unshifted automatically. | |
# EX. => "timestamp<@sep>tag<@sep>val1<@sep>val2" | |
# | |
key = @key_list.collect{|key| record[key]}.unshift(tag).unshift(rounded_timestamp).join(@sep) | |
# | |
# Values is a value for aggregation. So these fields must be integer. | |
# fluent_counter is a record counter, @value_list is a user defined value list. | |
# | |
values = {} | |
values['fluent_counter'] = record.has_key?('fluent_counter') ? record['fluent_counter'].to_i : 1 | |
@value_list.each{ |vkey| values[vkey] = record[vkey].to_i } | |
{ 'key'=>key, 'values'=>values, 'opts'=>opts }.to_msgpack | |
end | |
def write(chunk) | |
super | |
# | |
# You can chose any NoSQLs. Your must perform 2 tasks here: | |
# Each record of chunks is { 'key'=>key, 'values'=>values, 'opts'=>opts }. | |
# For a record which have {'key'=>key} field: | |
# 1. Set opts fields, | |
# 2. Increment each value of values fields. | |
# | |
end | |
private | |
def get_time_parts(time_obj) | |
t = time_obj.dup | |
case @time_suffix.to_s | |
when "s" | |
parts = t.sec | |
rounded_time_obj = t | |
when "m" | |
parts = t.min | |
rounded_time_obj = t - t.sec | |
when "h" | |
parts = t.hour | |
rounded_time_obj = t - t.sec - 60*t.min | |
when "d" | |
parts = t.day | |
rounded_time_obj = t - t.sec - 60*t.min- 60*60*t.hour | |
else | |
raise ConfigError, "'time_suffix' parameter value must include ['s','m','h','d']" | |
end | |
return parts, rounded_time_obj.to_i | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#require 'fluent/plugin/out_nosql_aggregator' | |
module Fluent | |
class RedisAggregatedOutput < NoSQLAggregatedOutput | |
Fluent::Plugin.register_output('redis_aggregator', self) | |
def initialize | |
super | |
require 'redis' | |
end | |
def configure(conf) | |
super | |
# Redis | |
@host = conf['host'] || 'localhost' | |
@port = conf['port'].to_i || 6379 | |
@db = conf['db'].to_i || 0 | |
end | |
def start | |
@redis = Redis.new(:host => @host, :port => @port, :db => @db, :thread_safe => true) | |
super | |
end | |
def shutdown | |
super | |
@redis.quit | |
end | |
def write(chunk) | |
# Pipelining and transaction | |
@redis.pipelined { | |
chunk.open { |io| | |
begin | |
MessagePack::Unpacker.new(io).each { |record| | |
# Set opts fields | |
@redis.mapped_hmset record['key'], record['opts'] | |
# Increment each fields of values | |
record['values'].each{ |vkey, vvalue| | |
@redis.hincrby record['key'], vkey, vvalue.to_i | |
} | |
} | |
rescue EOFError | |
# EOFError always occured when reached end of chunk. | |
end | |
} | |
} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment