Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created September 29, 2011 19:36
Show Gist options
  • Save doryokujin/1251708 to your computer and use it in GitHub Desktop.
Save doryokujin/1251708 to your computer and use it in GitHub Desktop.
fluent-plugin-redis
module Fluent
class RedisOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('redis', self)
def initialize
super
require 'redis'
require 'msgpack'
end
def configure(conf)
super
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'] : 6379
end
def start
super
@redis = Redis.new(:host => @host, :port => @port)
end
def shutdown
@redis.quit
end
def format(tag, event)
# event.record[:identifier]=[tag,event.time].join(".")
# event.record.to_msgpack
identifier=[tag,event.time].join(".")
[ identifier, event.record ].to_msgpack
end
def write(chunk)
@redis.pipelined {
chunk.open { |io|
begin
MessagePack::Unpacker.new(io).each { |record|
# identifier = record["identifier"].to_s
# record.delete("identifier")
# @redis.mapped_hmset identifier, record
@redis.mapped_hmset record[0], record[1]
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}
}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment