Last active
December 11, 2015 23:09
-
-
Save alexdean/4674639 to your computer and use it in GitHub Desktop.
generalized batch mode for logstash outputs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TOTALLY UNTESTED. Just a sketch. | |
module BatchAccumulator | |
def batch_initialize(options={}) | |
if ! self.class.method_defined?(:flush) | |
raise ArgumentError, "Any class including BatchAccumulator must define a flush() method." | |
end | |
@batch_events ||= 50 | |
@batch_timeout ||= 5 | |
# events we've accumulated | |
batch_clear_pending | |
@pending_mutex = Mutex.new | |
# count of events which are being flushed | |
# see batch_flush | |
@outgoing_count = 0 | |
@last_flush = Time.now.to_i | |
@flush_thread = Thread.new do | |
while sleep(@batch_timeout) do | |
flush(:force => true) | |
end | |
end | |
@flush_mutex = Mutex.new | |
@batch_initialized = true | |
end | |
def batch_clear_pending | |
@pending = Hash.new { |h, k| h[k] = [] } | |
@pending_count = 0 | |
end | |
# save an event for later delivery | |
# events are grouped by the (optional) group parameter you provide | |
# groups of events, plus the group name, are passed to your batch_flush() method | |
def batch_receive(event, group=nil) | |
batch_initialize if ! @batch_initialized | |
# block if we've accumulated too many events | |
while (@pending_count + @outgoing_count) > @batch_events do end | |
@pending_mutex.synchronize do | |
@pending[group] << event | |
@pending_count += 1 | |
end | |
batch_flush | |
end | |
def batch_flush(options={}) | |
force = options[:force] || options[:final] | |
final = options[:final] | |
# final flush will wait for lock, so we are sure to flush out all buffered events | |
if options[:final] | |
@flush_mutex.lock | |
elsif ! @flush_mutex.try_lock # failed to get lock, another flush already in progress | |
return | |
end | |
begin | |
time_since_last_flush = Time.now.to_i - @last_flush | |
if (force && @pending_count > 0) || | |
(@pending_count >= @batch_events) || | |
(time_since_last_flush >= @batch_timeout && @pending_count > 0) | |
@pending_mutex.synchronize do | |
outgoing = @pending | |
@outgoing_count = @pending_count | |
batch_clear_pending | |
end | |
@logger.debug("Flushing output", | |
:outgoing_count => @outgoing_count, | |
:time_since_last_flush => time_since_last_flush, | |
:outgoing_events => outgoing, | |
:batch_timeout => @batch_timeout, | |
:force => force, | |
:final => final) | |
outgoing.each do |group, events| | |
begin | |
group.nil? ? flush(events) : flush(events, group) | |
outgoing.delete(group) | |
@outgoing_count -= events.size | |
rescue => e | |
@logger.warn("Failed to flush backlog of events", | |
:outgoing_count => @outgoing_count, | |
:identity => identity, :exception => e, | |
:backtrace => e.backtrace) | |
if self.class.method_defined?(:on_flush_error) | |
on_flush_error e | |
end | |
sleep 1 | |
retry | |
end | |
@last_flush = Time.now.to_i | |
end | |
end | |
ensure | |
@flush_mutex.unlock | |
end | |
end | |
end | |
require "logstash/outputs/base" | |
require "logstash/namespace" | |
# send events to a redis database using RPUSH | |
# | |
# For more information about redis, see <http://redis.io/> | |
class LogStash::Outputs::Redis < LogStash::Outputs::Base | |
config_name "redis" | |
plugin_status "beta" | |
# Name is used for logging in case there are multiple instances. | |
# TODO: delete | |
config :name, :validate => :string, :default => 'default', | |
:deprecated => true | |
# The hostname(s) of your redis server(s). Ports may be specified on any | |
# hostname, which will override the global port config. | |
# | |
# For example: | |
# | |
# "127.0.0.1" | |
# ["127.0.0.1", "127.0.0.2"] | |
# ["127.0.0.1:6380", "127.0.0.1"] | |
config :host, :validate => :array, :default => ["127.0.0.1"] | |
# Shuffle the host list during logstash startup. | |
config :shuffle_hosts, :validate => :boolean, :default => true | |
# The default port to connect on. Can be overridden on any hostname. | |
config :port, :validate => :number, :default => 6379 | |
# The redis database number. | |
config :db, :validate => :number, :default => 0 | |
# Redis initial connection timeout in seconds. | |
config :timeout, :validate => :number, :default => 5 | |
# Password to authenticate with. There is no authentication by default. | |
config :password, :validate => :password | |
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are | |
# valid here, for example "logstash-%{@type}" | |
# TODO: delete | |
config :queue, :validate => :string, :deprecated => true | |
# The name of a redis list or channel. Dynamic names are | |
# valid here, for example "logstash-%{@type}". | |
# TODO set required true | |
config :key, :validate => :string, :required => false | |
# Either list or channel. If redis_type is list, then we will RPUSH to key. | |
# If redis_type is channel, then we will PUBLISH to key. | |
# TODO set required true | |
config :data_type, :validate => [ "list", "channel" ], :required => false | |
# Set to true if you want redis to batch up values and send 1 RPUSH command | |
# instead of one command per value to push on the list. Note that this only | |
# works with data_type="list" mode right now. | |
# | |
# If true, we send an RPUSH every "batch_events" events or | |
# "batch_timeout" seconds (whichever comes first). | |
config :batch, :validate => :boolean, :default => false | |
# If batch is set to true, the number of events we queue up for an RPUSH. | |
config :batch_events, :validate => :number, :default => 50 | |
# If batch is set to true, the maximum amount of time between RPUSH commands | |
# when there are pending events to flush. | |
config :batch_timeout, :validate => :number, :default => 5 | |
def register | |
require 'redis' | |
# TODO remove after setting key and data_type to true | |
if @queue | |
if @key or @data_type | |
raise RuntimeError.new( | |
"Cannot specify queue parameter and key or data_type" | |
) | |
end | |
@key = @queue | |
@data_type = 'list' | |
end | |
if not @key or not @data_type | |
raise RuntimeError.new( | |
"Must define queue, or key and data_type parameters" | |
) | |
end | |
# end TODO | |
if @batch | |
if @data_type != "list" | |
raise RuntimeError.new( | |
"batch is not supported with data_type #{@data_type}" | |
) | |
end | |
include BatchAccumulator | |
end | |
@redis = nil | |
if @shuffle_hosts | |
@host.shuffle! | |
end | |
@host_idx = 0 | |
end # def register | |
def receive(event) | |
return unless output?(event) | |
if @batch | |
batch_receive(event.to_json, event.sprintf(@key)) | |
return | |
end | |
event_key_and_payload = [event.sprintf(@key), event.to_json] | |
begin | |
@redis ||= connect | |
if @data_type == 'list' | |
@redis.rpush *event_key_and_payload | |
else | |
@redis.publish *event_key_and_payload | |
end | |
rescue => e | |
@logger.warn("Failed to send event to redis", :event => event, | |
:identity => identity, :exception => e, | |
:backtrace => e.backtrace) | |
sleep 1 | |
@redis = nil | |
retry | |
end | |
end # def receive | |
def flush(events, key) | |
@redis.rpush(key, events) | |
end | |
def on_flush_error(e) | |
@redis = nil | |
connect | |
end | |
def teardown | |
if @batch | |
batch_flush(:final => true) | |
end | |
if @data_type == 'channel' and @redis | |
@redis.quit | |
@redis = nil | |
end | |
end | |
private | |
def connect | |
@current_host, @current_port = @host[@host_idx].split(':') | |
@host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1 | |
if not @current_port | |
@current_port = @port | |
end | |
params = { | |
:host => @current_host, | |
:port => @current_port, | |
:timeout => @timeout, | |
:db => @db | |
} | |
@logger.debug(params) | |
if @password | |
params[:password] = @password.value | |
end | |
Redis.new(params) | |
end # def connect | |
# A string used to identify a redis instance in log messages | |
def identity | |
@name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is this batch capability something you're planning on a general feature for any output? (e.g. batch up 50 log records and post as one message to an HTTP REST API)
I may need something like this!