Skip to content

Instantly share code, notes, and snippets.

@jmaitrehenry
Created November 1, 2017 19:59
Show Gist options
  • Save jmaitrehenry/788c5355068ebc49123661557cad775c to your computer and use it in GitHub Desktop.
Save jmaitrehenry/788c5355068ebc49123661557cad775c to your computer and use it in GitHub Desktop.
# encoding: utf-8
require "logstash/namespace"
require "logstash/inputs/base"
require "logstash/inputs/threadable"
# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions. Versions 2.6.0+
# are recommended.
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
#
module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
# class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
config_name "redis"
default :codec, "json"
# The `name` configuration is used for logging in case there are multiple instances.
# This feature has no real function and will be removed in future versions.
config :name, :validate => :string, :default => "default", :deprecated => true
# The hostname of your Redis server.
config :host, :validate => :string, :default => "127.0.0.1"
# The port to connect on.
config :port, :validate => :number, :default => 6379
# The Redis database number.
config :db, :validate => :number, :default => 0
config :master, :validate => :string, :default => "mymaster"
config :sentinel_hosts, :validate => :array
# Initial connection timeout in seconds.
config :timeout, :validate => :number, :default => 5
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password
# The name of the Redis queue (we'll use BLPOP against this).
# TODO: remove soon.
config :queue, :validate => :string, :deprecated => true
# The name of a Redis list or channel.
# TODO: change required to true
config :key, :validate => :string, :required => false
# Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the
# key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
# TODO: change required to true
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false
# The number of events to return from Redis using EVAL.
config :batch_count, :validate => :number, :default => 1
public
# public API
# use to store a proc that can provide a redis instance or mock
def add_external_redis_builder(builder) #callable
@redis_builder = builder
self
end
# use to apply an instance directly and bypass the builder
def use_redis(instance)
@redis = instance
self
end
def new_redis_instance
@redis_builder.call
end
def register
require 'redis'
# TODO remove after setting key and data_type to true
if @queue
if @key || @data_type
raise RuntimeError.new(
"Cannot specify queue parameter and key or data_type"
)
end
@key = @queue
@data_type = 'list'
end
if !@key || !@data_type
raise RuntimeError.new(
"Must define queue, or key and data_type parameters"
)
end
# end TODO
@redis_builder ||= method(:internal_redis_builder)
# just switch on data_type once
if @data_type == 'list' || @data_type == 'dummy'
@run_method = method(:list_runner)
@teardown_method = method(:list_teardown)
elsif @data_type == 'channel'
@run_method = method(:channel_runner)
@teardown_method = method(:subscribe_teardown)
elsif @data_type == 'pattern_channel'
@run_method = method(:pattern_channel_runner)
@teardown_method = method(:subscribe_teardown)
end
# TODO(sissel, boertje): set @identity directly when @name config option is removed.
@identity = identity
@logger.info("Registering Redis", :identity => @identity)
end # def register
def identity
if @sentinel_hosts
return "redis-sentinel://#{@password} #{$sentinel_hosts} #{@db} #{@data_type}:#{@key}"
end
@identity = @name != 'default' ? @name : "redis://#{@password}@#{@host}:#{@port}/#{@db} #{@data_type}:#{@key}"
end
def run(output_queue)
@run_method.call(output_queue)
rescue LogStash::ShutdownSignal
# ignore and quit
end # def run
def teardown
@shutdown_requested = true
@teardown_method.call
end
# private methods -----------------------------
private
def batched?
@batch_count > 1
end
# private
def is_list_type?
@data_type == 'list'
end
# private
def redis_params
{
:host => @host,
:port => @port,
:timeout => @timeout,
:db => @db,
:password => @password.nil? ? nil : @password.value
}
end
# private
def internal_redis_builder
if @sentinel_hosts
@logger.info('Connecting to sentinel')
params = redis_params
hosts = []
for sentinel_host in @sentinel_hosts
host, port = sentinel_host.split(":")
unless port
port = @sentinel_port
end
hosts.push({:host => host, :port => port})
end
params[:host] = @master
params[:sentinels] = hosts
params[:role] = :master
else
params = redis_params
end
::Redis.new(params)
end
# private
def connect
redis = new_redis_instance
load_batch_script(redis) if batched? && is_list_type?
redis
end # def connect
# private
def load_batch_script(redis)
#A Redis Lua EVAL script to fetch a count of keys
#in case count is bigger than current items in queue whole queue will be returned without extra nil values
redis_script = <<EOF
local i = tonumber(ARGV[1])
local res = {}
local length = redis.call('llen',KEYS[1])
if length < i then i = length end
while (i > 0) do
local item = redis.call("lpop", KEYS[1])
if (not item) then
break
end
table.insert(res, item)
i = i-1
end
return res
EOF
@redis_script_sha = redis.script(:load, redis_script)
end
# private
def queue_event(msg, output_queue)
begin
@codec.decode(msg) do |event|
decorate(event)
output_queue << event
end
rescue LogStash::ShutdownSignal => e
# propagate up
raise(e)
rescue => e # parse or event creation error
@logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);
end
end
# private
def shutting_down?
@shutdown_requested
end
# private
def running?
!@shutdown_requested
end
# private
def list_teardown
return if @redis.nil? || [email protected]?
@redis.quit rescue nil
@redis = nil
end
# private
def list_runner(output_queue)
while running?
begin
@redis ||= connect
list_listener(@redis, output_queue)
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
sleep 1
end
end
end
# private
def list_listener(redis, output_queue)
item = redis.blpop(@key, 0, :timeout => 1)
return unless item # from timeout or other conditions
# blpop returns the 'key' read from as well as the item result
# we only care about the result (2nd item in the list).
queue_event(item.last, output_queue)
# If @batch_count is 1, there's no need to continue.
return if !batched?
begin
redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|
queue_event(item, output_queue)
end
# Below is a commented-out implementation of 'batch fetch'
# using pipelined LPOP calls. This in practice has been observed to
# perform exactly the same in terms of event throughput as
# the evalsha method. Given that the EVALSHA implementation uses
# one call to Redis instead of N (where N == @batch_count) calls,
# I decided to go with the 'evalsha' method of fetching N items
# from Redis in bulk.
#redis.pipelined do
#error, item = redis.lpop(@key)
#(@batch_count-1).times { redis.lpop(@key) }
#end.each do |item|
#queue_event(item, output_queue) if item
#end
# --- End commented out implementation of 'batch fetch'
rescue ::Redis::CommandError => e
if e.to_s =~ /NOSCRIPT/ then
@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
load_batch_script(redis)
retry
else
raise e
end
end
end
# private
def subscribe_teardown
return if @redis.nil? || [email protected]?
# if its a SubscribedClient then:
# it does not have a disconnect method (yet)
if @redis.client.is_a?(::Redis::SubscribedClient)
@redis.client.unsubscribe
else
@redis.client.disconnect
end
@redis = nil
end
# private
def redis_runner
begin
@redis ||= connect
yield
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
sleep 1
retry
end
end
# private
def channel_runner(output_queue)
redis_runner do
channel_listener(output_queue)
end
end
# private
def channel_listener(output_queue)
@redis.subscribe(@key) do |on|
on.subscribe do |channel, count|
@logger.info("Subscribed", :channel => channel, :count => count)
end
on.message do |channel, message|
queue_event(message, output_queue)
end
on.unsubscribe do |channel, count|
@logger.info("Unsubscribed", :channel => channel, :count => count)
end
end
end
def pattern_channel_runner(output_queue)
redis_runner do
pattern_channel_listener(output_queue)
end
end
# private
def pattern_channel_listener(output_queue)
@redis.psubscribe @key do |on|
on.psubscribe do |channel, count|
@logger.info("Subscribed", :channel => channel, :count => count)
end
on.pmessage do |pattern, channel, message|
queue_event(message, output_queue)
end
on.punsubscribe do |channel, count|
@logger.info("Unsubscribed", :channel => channel, :count => count)
end
end
end
# end
end end end # Redis Inputs LogStash
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment