Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created October 4, 2011 06:39
Show Gist options
  • Save doryokujin/1261025 to your computer and use it in GitHub Desktop.
Save doryokujin/1261025 to your computer and use it in GitHub Desktop.
NoSQLAggregatedOutput #fluent
# my host is host4
<source>
type redis_dumper
tag reduce_redis
host host1
port 6003
delay 10
time_suffix s
key key1,key2,key3
value val1
reducer_id 0
reduce_num 2
</source>
<match reduce**>
type redis_aggregator
host localhost
port 6003
time_suffix s
key key1,key2,key3
value val1
</match>
#(M): Mapper, (R): Reducer
*.log --> host1(M)->
->(R)host4(M)->
*.log --> host2(M)-> ->(R)host6 --> output
->(R)host5(M)->
*.log --> host3(M)->
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"b","key3":"c","event_time":"2s","route":"host3,host4,host6","fluent_counter":"2", "val1": "100"}
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"b","key3":"d","event_time":"2s","route":"host2,host5,host6","fluent_counter":"3", "val1": "10"}
{"rounded_timestamp":"1317708362","tag":"dump_redis","key1":"a","key2":"e","key3":"c","event_time":"2s","route":"host1,host4,host6","fluent_counter":"4", "val1": "20"}
module Fluent
class NoSQLAggregatedOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('nosql_aggregator', self)
def initialize
super
require 'msgpack'
require 'date'
end
def configure(conf)
super
raise ConfigError, "'key' parameter is required on Redis Aggregator output" unless @key_list = conf['key'].split(',').sort
raise ConfigError, "'time_suffix' parameter is required on Redis Aggregator output" unless @time_suffix = conf['time_suffix']
# Map to { @group_key => rounded_timestamp }.
@group_key = 'rounded_timestamp'
# Value list to aggregate, so each value must be integer.
@value_list = conf.has_key?('value') ? conf['value'].split(',') : []
# Key separator.
@sep = conf['separator'] || "\34"
# NoSQL
@host = nil
@port = nil
# Use for recording which the record come.
@localhost = `hostname`.chop
# Default is 60, [OutputThread.flush_interval]
@writer.flush_interval = Fluent::Config.time_value("1"+@time_suffix)
end
def start
super
end
def shutdown
super
end
def format(tag, event)
record = event.record.dup
#
# Opts has special fields used internally and for debug.
# opts [str] :event_time is a time + @time_suffix of current process such as '3s', '30m', '14h' or '29d'.
# opts [int] :@group_key is a timestamp rounded by a unit below @time_suffix.
# opts [str] :@route means where the record come from. 'host1,host3,host5' means the route `host1->host3->host5`.
#
opts = {}
event_time, rounded_timestamp = get_time_parts(Time.at(event.time))
opts['event_time'] = record.has_key?('event_time') ? record['event_time'] : event_time.to_s + @time_suffix
opts[@group_key] = record.has_key?(@group_key) ? record[@group_key] : rounded_timestamp
opts['route'] = record.has_key?('route') ? record['route']+','+@localhost : @localhost
#
# Key is a joined multiple map_key string by @sep, the 1st ans 2nd variables are unshifted automatically.
# EX. => "timestamp<@sep>tag<@sep>val1<@sep>val2"
#
key = @key_list.collect{|key| record[key]}.unshift(tag).unshift(rounded_timestamp).join(@sep)
#
# Values is a value for aggregation. So these fields must be integer.
# fluent_counter is a record counter, @value_list is a user defined value list.
#
values = {}
values['fluent_counter'] = record.has_key?('fluent_counter') ? record['fluent_counter'].to_i : 1
@value_list.each{ |vkey| values[vkey] = record[vkey].to_i }
{ 'key'=>key, 'values'=>values, 'opts'=>opts }.to_msgpack
end
def write(chunk)
super
#
# You can chose any NoSQLs. Your must perform 2 tasks here:
# Each record of chunks is { 'key'=>key, 'values'=>values, 'opts'=>opts }.
# For a record which have {'key'=>key} field:
# 1. Set opts fields,
# 2. Increment each value of values fields.
#
end
private
def get_time_parts(time_obj)
t = time_obj.dup
case @time_suffix.to_s
when "s"
parts = t.sec
rounded_time_obj = t
when "m"
parts = t.min
rounded_time_obj = t - t.sec
when "h"
parts = t.hour
rounded_time_obj = t - t.sec - 60*t.min
when "d"
parts = t.day
rounded_time_obj = t - t.sec - 60*t.min- 60*60*t.hour
else
raise ConfigError, "'time_suffix' parameter value must include ['s','m','h','d']"
end
return parts, rounded_time_obj.to_i
end
end
end
#require 'fluent/plugin/out_nosql_aggregator'
module Fluent
class RedisAggregatedOutput < NoSQLAggregatedOutput
Fluent::Plugin.register_output('redis_aggregator', self)
def initialize
super
require 'redis'
end
def configure(conf)
super
# Redis
@host = conf['host'] || 'localhost'
@port = conf['port'].to_i || 6379
@db = conf['db'].to_i || 0
end
def start
@redis = Redis.new(:host => @host, :port => @port, :db => @db, :thread_safe => true)
super
end
def shutdown
super
@redis.quit
end
def write(chunk)
# Pipelining and transaction
@redis.pipelined {
chunk.open { |io|
begin
MessagePack::Unpacker.new(io).each { |record|
# Set opts fields
@redis.mapped_hmset record['key'], record['opts']
# Increment each fields of values
record['values'].each{ |vkey, vvalue|
@redis.hincrby record['key'], vkey, vvalue.to_i
}
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}
}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment