Created
January 8, 2026 11:11
-
-
Save ivoronin/33bb7fa7b9d7526c7eb9b5d4049ac736 to your computer and use it in GitHub Desktop.
Fluentd buffer file dump script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # frozen_string_literal: true | |
| # | |
| # Fluentd Buffer Dump Tool | |
| # | |
| # Dumps contents of Fluentd buffer files as NDJSON (newline-delimited JSON) | |
| # | |
| # Usage: | |
| # fluent-buffer-dump [options] <buffer_file> [buffer_file...] | |
| # | |
| # Examples: | |
| # fluent-buffer-dump /var/log/fluent/buffer.b*.log | |
| # fluent-buffer-dump --with-metadata /var/log/fluent/buffer.q*.log | |
| # fluent-buffer-dump --decompress gzip /var/log/fluent/buffer.b*.log | |
| # | |
| require 'optparse' | |
| require 'json' | |
| require 'msgpack' | |
| require 'zlib' | |
| begin | |
| require 'zstd-ruby' | |
| ZSTD_AVAILABLE = true | |
| rescue LoadError | |
| ZSTD_AVAILABLE = false | |
| end | |
| BUFFER_HEADER = "\xc1\x00".b.freeze | |
| # Fluentd EventTime - MessagePack extension type 0 | |
| class EventTime | |
| TYPE = 0 | |
| attr_reader :sec, :nsec | |
| def initialize(sec, nsec = 0) | |
| @sec = sec | |
| @nsec = nsec | |
| end | |
| def to_f | |
| @sec + @nsec / 1_000_000_000.0 | |
| end | |
| def to_time | |
| Time.at(@sec, @nsec, :nanosecond) | |
| end | |
| def to_msgpack_ext | |
| [@sec, @nsec].pack('NN') | |
| end | |
| def self.from_msgpack_ext(data) | |
| new(*data.unpack('NN')) | |
| end | |
| end | |
| def create_msgpack_factory | |
| factory = MessagePack::Factory.new | |
| factory.register_type(EventTime::TYPE, EventTime) | |
| factory | |
| end | |
| class FluentBufferDumper | |
| def initialize(options = {}) | |
| @decompress = options[:decompress] | |
| @with_metadata = options[:with_metadata] | |
| @raw_mode = options[:raw] | |
| @msgpack_factory = create_msgpack_factory | |
| end | |
| def dump_file(path) | |
| unless File.exist?(path) | |
| warn "Error: File not found: #{path}" | |
| return false | |
| end | |
| meta_path = path.end_with?('.meta') ? path : "#{path}.meta" | |
| data_path = path.end_with?('.meta') ? path.sub(/\.meta$/, '') : path | |
| metadata = read_metadata(meta_path) if File.exist?(meta_path) | |
| if @with_metadata && metadata | |
| puts JSON.generate({ _metadata: format_metadata(metadata) }) | |
| end | |
| dump_data(data_path, metadata) | |
| true | |
| rescue => e | |
| warn "Error processing #{path}: #{e.message}" | |
| warn e.backtrace.first(5).join("\n") if ENV['DEBUG'] | |
| false | |
| end | |
| private | |
| def read_metadata(path) | |
| File.open(path, 'rb') do |f| | |
| chunk = f.read | |
| return nil if chunk.nil? || chunk.size <= 6 | |
| if chunk.slice(0, 2) == BUFFER_HEADER | |
| size = chunk.slice(2, 4).unpack1('N') | |
| MessagePack.unpack(chunk.slice(6, size), symbolize_keys: true) | |
| else | |
| # Old format fallback | |
| MessagePack.unpack(chunk, symbolize_keys: true) | |
| end | |
| end | |
| rescue => e | |
| warn "Warning: Could not read metadata from #{path}: #{e.message}" | |
| nil | |
| end | |
| def format_metadata(meta) | |
| result = {} | |
| result[:unique_id] = meta[:id]&.unpack1('H*') | |
| result[:tag] = meta[:tag] if meta[:tag] | |
| result[:timekey] = meta[:timekey] if meta[:timekey] | |
| result[:variables] = meta[:variables] if meta[:variables] | |
| result[:record_count] = meta[:s] | |
| result[:created_at] = meta[:c] ? Time.at(meta[:c]).iso8601 : nil | |
| result[:modified_at] = meta[:m] ? Time.at(meta[:m]).iso8601 : nil | |
| result[:seq] = meta[:seq] if meta[:seq] | |
| result.compact | |
| end | |
| def dump_data(path, metadata) | |
| return unless File.exist?(path) | |
| data = File.binread(path) | |
| data = decompress_data(data) if @decompress | |
| if @raw_mode | |
| # Raw mode: output data as-is (for pre-formatted buffers like JSON lines) | |
| $stdout.write(data) | |
| $stdout.write("\n") unless data.end_with?("\n") | |
| else | |
| # MessagePack mode: unpack and convert to JSON | |
| dump_msgpack_events(data) | |
| end | |
| end | |
| def decompress_data(data) | |
| case @decompress | |
| when 'gzip', :gzip | |
| Zlib::GzipReader.new(StringIO.new(data)).read | |
| when 'zstd', :zstd | |
| unless ZSTD_AVAILABLE | |
| raise "zstd-ruby gem not available. Install with: gem install zstd-ruby" | |
| end | |
| Zstd.decompress(data) | |
| else | |
| data | |
| end | |
| end | |
| def dump_msgpack_events(data) | |
| return if data.nil? || data.empty? | |
| unpacker = @msgpack_factory.unpacker | |
| unpacker.feed(data) | |
| unpacker.each do |obj| | |
| output_event(obj) | |
| end | |
| rescue MessagePack::MalformedFormatError, MessagePack::UnpackError => e | |
| # Data might not be MessagePack - try treating as raw text lines | |
| warn "Warning: Data is not valid MessagePack, trying raw text mode" | |
| data.force_encoding('UTF-8') | |
| data.each_line do |line| | |
| line = line.chomp | |
| next if line.empty? | |
| begin | |
| # Try to parse as JSON and re-emit | |
| parsed = JSON.parse(line) | |
| puts JSON.generate(parsed) | |
| rescue JSON::ParserError | |
| # Output as-is wrapped in a simple object | |
| puts JSON.generate({ _raw: line }) | |
| end | |
| end | |
| end | |
| def output_event(obj) | |
| case obj | |
| when Array | |
| if obj.size == 2 && time_value?(obj[0]) | |
| # Standard Fluentd event: [time, record] | |
| time, record = obj | |
| if record.is_a?(Hash) | |
| record['_time'] = format_time(time) unless record.key?('_time') | |
| puts JSON.generate(record) | |
| else | |
| puts JSON.generate({ _time: format_time(time), _data: record }) | |
| end | |
| elsif obj.size == 3 && obj[0].is_a?(String) | |
| # Tagged event: [tag, time, record] | |
| tag, time, record = obj | |
| if record.is_a?(Hash) | |
| record['_tag'] = tag unless record.key?('_tag') | |
| record['_time'] = format_time(time) unless record.key?('_time') | |
| puts JSON.generate(record) | |
| else | |
| puts JSON.generate({ _tag: tag, _time: format_time(time), _data: record }) | |
| end | |
| else | |
| # Array of events or other structure | |
| obj.each { |item| output_event(item) } | |
| end | |
| when Hash | |
| puts JSON.generate(obj) | |
| else | |
| puts JSON.generate({ _data: obj }) | |
| end | |
| end | |
| def time_value?(val) | |
| val.is_a?(Integer) || val.is_a?(Float) || val.is_a?(EventTime) | |
| end | |
| def format_time(time) | |
| case time | |
| when EventTime | |
| Time.at(time.sec, time.nsec, :nanosecond).iso8601(9) | |
| when Integer | |
| Time.at(time).iso8601 | |
| when Float | |
| Time.at(time).iso8601(6) | |
| when MessagePack::Timestamp | |
| Time.at(time.sec, time.nsec, :nanosecond).iso8601(9) | |
| else | |
| time.to_s | |
| end | |
| end | |
| end | |
| def main | |
| options = {} | |
| parser = OptionParser.new do |opts| | |
| opts.banner = "Usage: #{$0} [options] <buffer_file> [buffer_file...]" | |
| opts.separator "" | |
| opts.separator "Dumps Fluentd buffer file contents as NDJSON" | |
| opts.separator "" | |
| opts.separator "Options:" | |
| opts.on("-d", "--decompress TYPE", [:gzip, :zstd], | |
| "Decompress data (gzip, zstd)") do |type| | |
| options[:decompress] = type | |
| end | |
| opts.on("-m", "--with-metadata", | |
| "Include chunk metadata as first line") do | |
| options[:with_metadata] = true | |
| end | |
| opts.on("-r", "--raw", | |
| "Raw mode: output buffer data as-is (for JSON-formatted buffers)") do | |
| options[:raw] = true | |
| end | |
| opts.on("-h", "--help", "Show this help") do | |
| puts opts | |
| exit 0 | |
| end | |
| end | |
| parser.parse! | |
| if ARGV.empty? | |
| puts parser | |
| exit 1 | |
| end | |
| dumper = FluentBufferDumper.new(options) | |
| success = true | |
| ARGV.each do |path| | |
| # Expand globs if shell didn't | |
| files = File.exist?(path) ? [path] : Dir.glob(path) | |
| if files.empty? | |
| warn "Warning: No files match: #{path}" | |
| success = false | |
| next | |
| end | |
| files.each do |file| | |
| # Skip .meta files when processing data files | |
| next if file.end_with?('.meta') && !ARGV.include?(file) | |
| success &= dumper.dump_file(file) | |
| end | |
| end | |
| exit(success ? 0 : 1) | |
| end | |
| main if __FILE__ == $0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment