Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Created September 28, 2011 07:28
Show Gist options
  • Save frsyuki/1247235 to your computer and use it in GitHub Desktop.
Save frsyuki/1247235 to your computer and use it in GitHub Desktop.
module Fluent
class MongoOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('mongo', self)
def initialize
super
require 'mongo'
require 'msgpack'
end
def configure(conf)
super
raise ConfigError, "'database' parameter is required on file output" unless @database = conf['database']
raise ConfigError, "'collection' parameter is required on file output" unless @collection = conf['collection']
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'] : 27017
end
def start
super
@collection = Mongo::Connection.new(@host, @port).db(@database).collection(@collection)
end
def shutdown
# Mongo::Connection checks alive or closed myself.
@collection.db.connection.close
end
def format(tag, event)
event.record.to_msgpack
end
def write(chunk)
records = []
MessagePack::Unpacker.new.feed_each(chunk.read) {|record|
records << record
}
@collection.insert(records)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment