Skip to content

Instantly share code, notes, and snippets.

@ivoronin
Created January 8, 2026 11:11
Show Gist options
  • Select an option

  • Save ivoronin/33bb7fa7b9d7526c7eb9b5d4049ac736 to your computer and use it in GitHub Desktop.

Select an option

Save ivoronin/33bb7fa7b9d7526c7eb9b5d4049ac736 to your computer and use it in GitHub Desktop.
Fluentd buffer file dump script
#!/usr/bin/env ruby
# frozen_string_literal: true
#
# Fluentd Buffer Dump Tool
#
# Dumps contents of Fluentd buffer files as NDJSON (newline-delimited JSON)
#
# Usage:
# fluent-buffer-dump [options] <buffer_file> [buffer_file...]
#
# Examples:
# fluent-buffer-dump /var/log/fluent/buffer.b*.log
# fluent-buffer-dump --with-metadata /var/log/fluent/buffer.q*.log
# fluent-buffer-dump --decompress gzip /var/log/fluent/buffer.b*.log
#
require 'optparse'
require 'json'
require 'msgpack'
require 'zlib'
begin
require 'zstd-ruby'
ZSTD_AVAILABLE = true
rescue LoadError
ZSTD_AVAILABLE = false
end
BUFFER_HEADER = "\xc1\x00".b.freeze
# Fluentd EventTime - MessagePack extension type 0
class EventTime
TYPE = 0
attr_reader :sec, :nsec
def initialize(sec, nsec = 0)
@sec = sec
@nsec = nsec
end
def to_f
@sec + @nsec / 1_000_000_000.0
end
def to_time
Time.at(@sec, @nsec, :nanosecond)
end
def to_msgpack_ext
[@sec, @nsec].pack('NN')
end
def self.from_msgpack_ext(data)
new(*data.unpack('NN'))
end
end
def create_msgpack_factory
factory = MessagePack::Factory.new
factory.register_type(EventTime::TYPE, EventTime)
factory
end
class FluentBufferDumper
def initialize(options = {})
@decompress = options[:decompress]
@with_metadata = options[:with_metadata]
@raw_mode = options[:raw]
@msgpack_factory = create_msgpack_factory
end
def dump_file(path)
unless File.exist?(path)
warn "Error: File not found: #{path}"
return false
end
meta_path = path.end_with?('.meta') ? path : "#{path}.meta"
data_path = path.end_with?('.meta') ? path.sub(/\.meta$/, '') : path
metadata = read_metadata(meta_path) if File.exist?(meta_path)
if @with_metadata && metadata
puts JSON.generate({ _metadata: format_metadata(metadata) })
end
dump_data(data_path, metadata)
true
rescue => e
warn "Error processing #{path}: #{e.message}"
warn e.backtrace.first(5).join("\n") if ENV['DEBUG']
false
end
private
def read_metadata(path)
File.open(path, 'rb') do |f|
chunk = f.read
return nil if chunk.nil? || chunk.size <= 6
if chunk.slice(0, 2) == BUFFER_HEADER
size = chunk.slice(2, 4).unpack1('N')
MessagePack.unpack(chunk.slice(6, size), symbolize_keys: true)
else
# Old format fallback
MessagePack.unpack(chunk, symbolize_keys: true)
end
end
rescue => e
warn "Warning: Could not read metadata from #{path}: #{e.message}"
nil
end
def format_metadata(meta)
result = {}
result[:unique_id] = meta[:id]&.unpack1('H*')
result[:tag] = meta[:tag] if meta[:tag]
result[:timekey] = meta[:timekey] if meta[:timekey]
result[:variables] = meta[:variables] if meta[:variables]
result[:record_count] = meta[:s]
result[:created_at] = meta[:c] ? Time.at(meta[:c]).iso8601 : nil
result[:modified_at] = meta[:m] ? Time.at(meta[:m]).iso8601 : nil
result[:seq] = meta[:seq] if meta[:seq]
result.compact
end
def dump_data(path, metadata)
return unless File.exist?(path)
data = File.binread(path)
data = decompress_data(data) if @decompress
if @raw_mode
# Raw mode: output data as-is (for pre-formatted buffers like JSON lines)
$stdout.write(data)
$stdout.write("\n") unless data.end_with?("\n")
else
# MessagePack mode: unpack and convert to JSON
dump_msgpack_events(data)
end
end
def decompress_data(data)
case @decompress
when 'gzip', :gzip
Zlib::GzipReader.new(StringIO.new(data)).read
when 'zstd', :zstd
unless ZSTD_AVAILABLE
raise "zstd-ruby gem not available. Install with: gem install zstd-ruby"
end
Zstd.decompress(data)
else
data
end
end
def dump_msgpack_events(data)
return if data.nil? || data.empty?
unpacker = @msgpack_factory.unpacker
unpacker.feed(data)
unpacker.each do |obj|
output_event(obj)
end
rescue MessagePack::MalformedFormatError, MessagePack::UnpackError => e
# Data might not be MessagePack - try treating as raw text lines
warn "Warning: Data is not valid MessagePack, trying raw text mode"
data.force_encoding('UTF-8')
data.each_line do |line|
line = line.chomp
next if line.empty?
begin
# Try to parse as JSON and re-emit
parsed = JSON.parse(line)
puts JSON.generate(parsed)
rescue JSON::ParserError
# Output as-is wrapped in a simple object
puts JSON.generate({ _raw: line })
end
end
end
def output_event(obj)
case obj
when Array
if obj.size == 2 && time_value?(obj[0])
# Standard Fluentd event: [time, record]
time, record = obj
if record.is_a?(Hash)
record['_time'] = format_time(time) unless record.key?('_time')
puts JSON.generate(record)
else
puts JSON.generate({ _time: format_time(time), _data: record })
end
elsif obj.size == 3 && obj[0].is_a?(String)
# Tagged event: [tag, time, record]
tag, time, record = obj
if record.is_a?(Hash)
record['_tag'] = tag unless record.key?('_tag')
record['_time'] = format_time(time) unless record.key?('_time')
puts JSON.generate(record)
else
puts JSON.generate({ _tag: tag, _time: format_time(time), _data: record })
end
else
# Array of events or other structure
obj.each { |item| output_event(item) }
end
when Hash
puts JSON.generate(obj)
else
puts JSON.generate({ _data: obj })
end
end
def time_value?(val)
val.is_a?(Integer) || val.is_a?(Float) || val.is_a?(EventTime)
end
def format_time(time)
case time
when EventTime
Time.at(time.sec, time.nsec, :nanosecond).iso8601(9)
when Integer
Time.at(time).iso8601
when Float
Time.at(time).iso8601(6)
when MessagePack::Timestamp
Time.at(time.sec, time.nsec, :nanosecond).iso8601(9)
else
time.to_s
end
end
end
def main
options = {}
parser = OptionParser.new do |opts|
opts.banner = "Usage: #{$0} [options] <buffer_file> [buffer_file...]"
opts.separator ""
opts.separator "Dumps Fluentd buffer file contents as NDJSON"
opts.separator ""
opts.separator "Options:"
opts.on("-d", "--decompress TYPE", [:gzip, :zstd],
"Decompress data (gzip, zstd)") do |type|
options[:decompress] = type
end
opts.on("-m", "--with-metadata",
"Include chunk metadata as first line") do
options[:with_metadata] = true
end
opts.on("-r", "--raw",
"Raw mode: output buffer data as-is (for JSON-formatted buffers)") do
options[:raw] = true
end
opts.on("-h", "--help", "Show this help") do
puts opts
exit 0
end
end
parser.parse!
if ARGV.empty?
puts parser
exit 1
end
dumper = FluentBufferDumper.new(options)
success = true
ARGV.each do |path|
# Expand globs if shell didn't
files = File.exist?(path) ? [path] : Dir.glob(path)
if files.empty?
warn "Warning: No files match: #{path}"
success = false
next
end
files.each do |file|
# Skip .meta files when processing data files
next if file.end_with?('.meta') && !ARGV.include?(file)
success &= dumper.dump_file(file)
end
end
exit(success ? 0 : 1)
end
main if __FILE__ == $0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment