Created
March 10, 2020 15:43
-
-
Save brycied00d/f3ce36a16c9d8d203bedf82dd5dd6fd2 to your computer and use it in GitHub Desktop.
fluentd-buffer-dump.rb - Dump the contents of a fluentd buffer to stdout for inspection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# Quick and dirty script to print the contents of a fluentd buffer chunk | |
# Useful when investigating a "corrupt" chunk, typically containing malformed | |
# record data. Recrd data is printed to stdout; progress/information is printed | |
# to stderr. | |
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.) | |
# Copyright 2020 Bryce Chidester <[email protected]> | |
require 'msgpack' | |
require 'pp' | |
require 'time' | |
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze | |
def parse_metadata(in_file) | |
meta_chunk_io = File.open(in_file, 'rb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.sync = true | |
meta_chunk_io.binmode | |
metadata = meta_chunk_io.read | |
warn "Metadata file size: #{metadata.size}" | |
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4) | |
warn "Failed to parse metadata file: #{in_file}" | |
return nil | |
end | |
if metadata.slice(0, 2) == BUFFER_HEADER | |
size = metadata.slice(2, 4).unpack('N').first | |
warn "Metadata chunk size: #{size}" | |
if size | |
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true) | |
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read | |
end | |
end | |
nil | |
end | |
in_file = ARGV.first | |
input_chunk_io = File.open(in_file, 'rb') | |
input_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
input_chunk_io.binmode | |
input_chunk_io.sync = true | |
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io) | |
input_chunk_metadata = parse_metadata("#{in_file}.meta") | |
warn "Metadata timekey: [#{input_chunk_metadata[:timekey]}]" | |
warn "Metadata tag: [#{input_chunk_metadata[:tag]}]" | |
warn "Metadata variables: [#{input_chunk_metadata[:variables]}]" | |
warn "Metadata sequence: [#{input_chunk_metadata[:seq]}]" | |
warn "Metadata size: [#{input_chunk_metadata[:s]}] (records)" | |
warn "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]" | |
warn "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]" | |
# //n switch means explicit 'ASCII-8BIT' pattern | |
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a | |
warn "Extracted state: #{state}" | |
warn "Extracted chunk_id: #{chunk_id}" | |
total_count = 0 | |
input_chunk_unpacker.each do |obj| | |
total_count += 1 | |
pp obj | |
end | |
warn "Total records: #{total_count}" | |
input_chunk_io.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment