Created
November 10, 2015 08:25
-
-
Save repeatedly/11b785850d65bf08e398 to your computer and use it in GitHub Desktop.
Faster in_tail implementation for regexp format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
require 'fluent/plugin/in_tail' | |
require 'fluent/parser' | |
class FastRegexpTailInput < TailInput | |
Plugin.register_input('fast_regexp_tail', self) | |
include TextParser::TypeConverter | |
def configure_parser(conf) | |
patterns = TextParser.lookup(conf['format']).patterns | |
@regexp = patterns['format'] | |
@time_parser = TextParser::TimeParser.new(patterns['time_format']) | |
@schema = PageSchema.new(*@regexp.names.map { |k| k.to_sym }) | |
end | |
def parse_singleline(lines, tail_watcher) | |
es = @schema.new_event_steram | |
lines.each { |line| | |
line.chomp! | |
m = @regexp.match(line) | |
unless m | |
log.warn "pattern not match: #{line.inspect}" | |
next | |
end | |
record = es.struct.new(*m.captures) | |
if @type_converters | |
@type_converters.each { |key, converter| | |
record[key] = converter.call(record[key]) | |
} | |
end | |
time = @time_parser.parse(record.time) | |
es.add(time, record) | |
} | |
es | |
end | |
class PageSchema | |
def initialize(*keys) | |
@struct = Struct.new(*keys) | |
@struct.class_eval do | |
def to_msgpack(arg = nil) | |
packer = arg.is_a?(MessagePack::Packer) ? arg : MessagePack::Packer.new(arg) | |
packer.write_map_header(self.length) | |
self.each_pair { |k, v| packer.write(k).write(v) } | |
packer | |
end | |
end | |
end | |
def new_event_steram | |
PageEventStream.new(@struct) | |
end | |
end | |
class PageEventStream < EventStream | |
def initialize(struct) | |
@struct = struct | |
@packer = MessagePack::Packer.new | |
end | |
attr_reader :struct | |
def add(time, struct) | |
@packer.write(time) | |
@packer.write(struct.values) | |
end | |
def empty? | |
@packer.buffer.empty? | |
end | |
def each(&block) | |
unpacker = MessagePack::Unpacker.new(@packer.buffer) | |
loop { | |
time = unpacker.read | |
values = unpacker.read | |
yield time, struct.new(*values) | |
} | |
rescue EOFError | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment