Last active
August 29, 2015 14:11
-
-
Save kiyoto/54305ed8cdb8ff916cbf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class IfFilter < Filter | |
Fluent::Plugin.register_filter('if', self) | |
config_param :if, :string | |
def configure(conf) | |
super | |
@placeholder_expander = RubyPlaceholderExpander.new(log) | |
then_conf = conf.elements.find { |element| element.name == 'then' } | |
unless then_conf | |
raise ConfigError, "<then> directive is required" | |
end | |
unless then_conf.has_key?("@type") | |
raise ConfigError, "<then> directive requires @type" | |
end | |
@then_filter = Plugin.new_filter(then_conf["@type"]) | |
@then_filter.router = router | |
@then_filter.configure(then_conf) | |
else_conf = conf.elements.find { |element| element.name == 'else' } | |
@else_filter = nil | |
if else_conf | |
unless else_conf.has_key?("@type") | |
raise ConfigError, "<else> directive requires @type" | |
end | |
@else_filter = Plugin.new_filter(else_conf["@type"]) | |
@else_filter.router = router | |
@else_filter.configure(else_conf) | |
end | |
end | |
def filter_stream(tag, es) | |
then_events = [] | |
else_events = [] | |
es_markers = [] | |
es.each do |time, record| | |
@placeholder_expander.prepare_placeholders(time, record) | |
cond = @placeholder_expander.expand(@if) | |
log.info "cond is #{cond} with type = #{cond.class}" | |
log.info "if #{@if}" | |
if cond | |
then_events.push([time, record]) | |
es_markers << :then | |
else | |
else_events.push([time, record]) | |
es_markers << :else | |
end | |
end | |
then_events = @then_filter.filter_stream(tag, ArrayEventStream.new(then_events)).to_a | |
if @else_filter | |
else_events = @else_filter.filter_stream(tag, ArrayEventStream.new(else_events)).to_a | |
end | |
new_es = MultiEventStream.new | |
es_markers.each do |marker| | |
case marker | |
when :then | |
time, record = then_events.pop | |
new_es.add(time, record) | |
when :else | |
time, record = else_events.pop | |
new_es.add(time, record) | |
end | |
end | |
new_es | |
end | |
# Heavily inspired by record_transformer | |
class RubyPlaceholderExpander | |
attr_reader :placeholders, :log | |
def initialize(log) | |
@log = log | |
end | |
# Get placeholders as a struct | |
# | |
# @param [Time] time the time | |
# @param [Hash] record the record | |
def prepare_placeholders(time, record) | |
struct = UndefOpenStruct.new(record) | |
struct.time = Time.at(time) | |
@placeholders = struct | |
end | |
# Replace placeholders in a string | |
# | |
# @param [String] str the string to be replaced | |
def expand(str) | |
str.gsub!(/\$\{([^}]+)\}/, '\1') | |
begin | |
eval str, @placeholders.instance_eval { binding } | |
rescue => e | |
log.warn "failed to expand `#{str}`", :error_class => e.class, :error => e.message | |
log.warn_backtrace | |
nil | |
end | |
end | |
class UndefOpenStruct < OpenStruct | |
(Object.instance_methods).each do |m| | |
undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ | |
end | |
end | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<source> | |
@type dummy | |
tag test.hello.world | |
</source> | |
<filter test.**> | |
@type if | |
if ${Random.rand > 0.5} | |
<then> | |
@type record_transformer | |
<record> | |
__branch then | |
</record> | |
</then> | |
<else> | |
@type record_transformer | |
<record> | |
__branch else | |
</record> | |
</else> | |
</filter> | |
<match test.**> | |
type stdout | |
</match> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment