Skip to content

Instantly share code, notes, and snippets.

@brycied00d
Created March 10, 2020 15:43
Show Gist options
  • Save brycied00d/f3ce36a16c9d8d203bedf82dd5dd6fd2 to your computer and use it in GitHub Desktop.
Save brycied00d/f3ce36a16c9d8d203bedf82dd5dd6fd2 to your computer and use it in GitHub Desktop.
fluentd-buffer-dump.rb - Dump the contents of a fluentd buffer to stdout for inspection.
#!/usr/bin/env ruby
# Quick and dirty script to print the contents of a fluentd buffer chunk
# Useful when investigating a "corrupt" chunk, typically containing malformed
# record data. Recrd data is printed to stdout; progress/information is printed
# to stderr.
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.)
# Copyright 2020 Bryce Chidester <[email protected]>
require 'msgpack'
require 'pp'
require 'time'
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze
def parse_metadata(in_file)
meta_chunk_io = File.open(in_file, 'rb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.sync = true
meta_chunk_io.binmode
metadata = meta_chunk_io.read
warn "Metadata file size: #{metadata.size}"
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
warn "Failed to parse metadata file: #{in_file}"
return nil
end
if metadata.slice(0, 2) == BUFFER_HEADER
size = metadata.slice(2, 4).unpack('N').first
warn "Metadata chunk size: #{size}"
if size
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true)
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read
end
end
nil
end
in_file = ARGV.first
input_chunk_io = File.open(in_file, 'rb')
input_chunk_io.set_encoding(Encoding::ASCII_8BIT)
input_chunk_io.binmode
input_chunk_io.sync = true
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io)
input_chunk_metadata = parse_metadata("#{in_file}.meta")
warn "Metadata timekey: [#{input_chunk_metadata[:timekey]}]"
warn "Metadata tag: [#{input_chunk_metadata[:tag]}]"
warn "Metadata variables: [#{input_chunk_metadata[:variables]}]"
warn "Metadata sequence: [#{input_chunk_metadata[:seq]}]"
warn "Metadata size: [#{input_chunk_metadata[:s]}] (records)"
warn "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]"
warn "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]"
# //n switch means explicit 'ASCII-8BIT' pattern
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a
warn "Extracted state: #{state}"
warn "Extracted chunk_id: #{chunk_id}"
total_count = 0
input_chunk_unpacker.each do |obj|
total_count += 1
pp obj
end
warn "Total records: #{total_count}"
input_chunk_io.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment