Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Last active August 29, 2015 14:06
Show Gist options
  • Save repeatedly/178f55c9dd8e87f09a72 to your computer and use it in GitHub Desktop.
Save repeatedly/178f55c9dd8e87f09a72 to your computer and use it in GitHub Desktop.
Fluentd v0.12 Filter API

Basic idea is here: https://github.com/fluent/fluentd/wiki/filter_label

Currently, <copy **> and <filter_set> is not implemented yet.

Implement Filter

The base Filter class is below:

module Fluent
  class Filter
    def filter(tag, time, record)
      # for mutaging record plugins
      # must return mutated record
    end

    def filter_stream(tag, es)
      # default implementation.
      new_es = MultiEventStream.new
      es.each { |time, record|
        new_es.add(time, filter(tag, time, record))
      }
      new_es
    end
  end
end

filter_stream has default implementation so you have 2 ways to implement a filter.

  • Overwrite filter method

If you want to mutate the record, this approach is better. filter method should return a mutated record. filter_stream calls filter method and uses returned record for new EventStream.

module Fluent
  class MutateRecordFilter < Filter
    Plugin.register_filter('mutate_record', self)

    def filter(tag, time, record)
      # mutate record
      mutated_record
    end
  end if defined?(Filter) # Avoid 'uninitialized constant Fluent::Filter' at Fluentd v0.10
end

filter example is here: https://gist.github.com/repeatedly/cb16d5667350c8a0e2c9

  • Overwrite filter_stream method

If you want to mutate the event stream, overwriting filter_stream is better. filter_stream should return EventStream.

module Fluent
  class MutateStreamFilter < Filter
    Plugin.register_filter('mutate_stream', self)

    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      # mutate / grep / etc event stream
      new_es
    end
  end if defined?(Filter) # Avoid 'uninitialized constant Fluent::Filter' at Fluentd v0.10
end

filter_stream example is here: https://gist.github.com/repeatedly/0ed3e2b4016b4045640a

Deploy Filter

Similar to existence plugin but prefix is filter_. Put a filter plugin to fluent/plugin/filter_xxx.rb in your gem.

Filter Configuration

<filter xxx> is added. <filter> is similar to <match> but filtered record is passed to next <filter> or <match>.

If you have following configuration and recieved a recored with logs.event tag, processing flow is add_metadata filter -> file output.

<source>
  type forward
</source>

<filter debug.**>
  type grep
  input_key k
  regexp WARN
</filter>

<filter logs.**>
  type add_metadata
  include_tag_key
</filter>

<match logs.**>
  type file
  # ...
</match>

<match debug.**>
  type stdout
</match>

<match **>
  type null
</match>

Grouping filter / output plugins and routing events with label.

Label Configuration

See following configuration. If you receive a record via forward, a record will be transferred to <label @foo>, not <match **> stdout. On the other hand, if you receive a record via http, a record will be transferred to <match **>, not <label @foo>.

You can clean up tag management with label.

<source>
  type forward
  @label @foo # specify Label
</source>

<source>
  type http
  # If @label not found, input plugin sends records to top level.
</source>

<match **>
  type stdout
</match>

<label @foo>
  <filter **>
    type add_metadata
    include_tag_key
  </filter>
  <match logs.**>
    type stdout
    output_type hash
  </match>
</label>

Implemeant Label

Please replace Engine.emit with router.emit. Since v0.12, Input / Output plugins have router attribute. Engine.emit can't handle @label parameter.

Label symbol

Label's name should start with @ to separate plugin parameters.

@label @foo # OK
label @foo # NG
label foo # NG

Limitation

Label can't have input plugin so can't put <source> in <label @xxx>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment