Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Created November 10, 2015 08:25
Show Gist options
  • Save repeatedly/11b785850d65bf08e398 to your computer and use it in GitHub Desktop.
Save repeatedly/11b785850d65bf08e398 to your computer and use it in GitHub Desktop.
Faster in_tail implementation for regexp format
module Fluent
require 'fluent/plugin/in_tail'
require 'fluent/parser'
class FastRegexpTailInput < TailInput
Plugin.register_input('fast_regexp_tail', self)
include TextParser::TypeConverter
def configure_parser(conf)
patterns = TextParser.lookup(conf['format']).patterns
@regexp = patterns['format']
@time_parser = TextParser::TimeParser.new(patterns['time_format'])
@schema = PageSchema.new(*@regexp.names.map { |k| k.to_sym })
end
def parse_singleline(lines, tail_watcher)
es = @schema.new_event_steram
lines.each { |line|
line.chomp!
m = @regexp.match(line)
unless m
log.warn "pattern not match: #{line.inspect}"
next
end
record = es.struct.new(*m.captures)
if @type_converters
@type_converters.each { |key, converter|
record[key] = converter.call(record[key])
}
end
time = @time_parser.parse(record.time)
es.add(time, record)
}
es
end
class PageSchema
def initialize(*keys)
@struct = Struct.new(*keys)
@struct.class_eval do
def to_msgpack(arg = nil)
packer = arg.is_a?(MessagePack::Packer) ? arg : MessagePack::Packer.new(arg)
packer.write_map_header(self.length)
self.each_pair { |k, v| packer.write(k).write(v) }
packer
end
end
end
def new_event_steram
PageEventStream.new(@struct)
end
end
class PageEventStream < EventStream
def initialize(struct)
@struct = struct
@packer = MessagePack::Packer.new
end
attr_reader :struct
def add(time, struct)
@packer.write(time)
@packer.write(struct.values)
end
def empty?
@packer.buffer.empty?
end
def each(&block)
unpacker = MessagePack::Unpacker.new(@packer.buffer)
loop {
time = unpacker.read
values = unpacker.read
yield time, struct.new(*values)
}
rescue EOFError
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment