Skip to content

Instantly share code, notes, and snippets.

@choplin
Created July 30, 2012 09:05
Show Gist options
  • Save choplin/3205750 to your computer and use it in GitHub Desktop.
Save choplin/3205750 to your computer and use it in GitHub Desktop.
Fluent output plugin for MessagePack RPC
module Fluent
class MessagePackRPCOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('msgpack_rpc', self)
config_param :host , :string , :default => 'localhost'
config_param :port , :integer
config_param :method , :string
config_param :unit , :string , :default => 'chunk'
def initialize
super
require 'msgpack/rpc'
@available_unit = {'chunk'=>:chunk,'record'=>:record}
end
def configure(conf)
super
end
def start
super
@client = initialize_client
if @available_unit.has_key?(@unit) then
@unit = @available_unit[@unit]
else
raise
end
end
def shutdown
super
if [email protected]? then
@client.close
end
end
def format(tag, time, record)
record.to_msgpack
end
def write(chunk)
case @unit
when :chunk
@client.call(@method, chunk.read)
when :record
#TODO
end
end
private
def initialize_client
begin
cli = MessagePack::RPC::Client.new(@host, @port)
#connection test
res = cli.call(@method, MessagePack.pack('test'))
rescue
shutdown
raise
end
cli
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment