Last active
March 15, 2020 02:30
-
-
Save brycied00d/70c53104d780835d2d48610b7a783c8c to your computer and use it in GitHub Desktop.
fluentd-buffer-check.rb - Validate each buffered record against some basic principles, identifying "bad"/"corrupt" records within buffer files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# Simple script to "validate" all records in a fluentd buffer | |
# Sometimes, something goes wonky in my environment and weird data ends up in the | |
# buffer file, and then causes certain output plugins to choke. Ideally, I can | |
# catch this earlier as a fluentd filter, but in the meantime I'm left with buffer | |
# files with lots of good data and bad data and I need a tool to confirm what that | |
# buffer contains. | |
# Version 2 of this script will write the good records to a "good file" for reinclusion | |
# back into fluentd, and a "bad file" recording the unprocessable entities. Additionally, | |
# it may attempt to fix common malformations I've seen, notably when a record | |
# inexplicably takes on another record as its child. (I still have no idea how this happens.) | |
# Record errors and their associated records are printed to stdout; buffer metadata is | |
# printed to stderr. | |
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.) | |
# Copyright 2020 Bryce Chidester <[email protected]> | |
# What makes a good record? | |
# 0. Each record should be an Array | |
# 1. Each record should have 2 nodes | |
# 2. The first node should be numeric and look like a valid timestamp | |
# 3. The second node should be a Hash | |
# 3a. No member of the Hash should be a Hash | |
# 3b. Each Hash member index is a String | |
# 3c. Each hash member value is a String, Numeric, or nil. | |
require 'msgpack' | |
require 'pp' | |
require 'time' | |
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze | |
def parse_metadata(in_file) | |
meta_chunk_io = File.open(in_file, 'rb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.sync = true | |
meta_chunk_io.binmode | |
metadata = meta_chunk_io.read | |
warn "Metadata file size: #{metadata.size}" | |
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4) | |
warn "Failed to parse metadata file: #{in_file}" | |
return nil | |
end | |
if metadata.slice(0, 2) == BUFFER_HEADER | |
size = metadata.slice(2, 4).unpack('N').first | |
warn "Metadata chunk size: #{size}" | |
if size | |
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true) | |
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read | |
end | |
end | |
nil | |
end | |
def bad_record(msg, record) | |
puts msg | |
pp record | |
# TODO: If "rewriting" then this would write the invalid data to the rejects file | |
end | |
in_file = ARGV.first | |
input_chunk_io = File.open(in_file, 'rb') | |
input_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
input_chunk_io.binmode | |
input_chunk_io.sync = true | |
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io) | |
input_chunk_metadata = parse_metadata("#{in_file}.meta") | |
warn "Metadata timekey: [#{input_chunk_metadata[:timekey]}]" | |
warn "Metadata tag: [#{input_chunk_metadata[:tag]}]" | |
warn "Metadata variables: [#{input_chunk_metadata[:variables]}]" | |
warn "Metadata sequence: [#{input_chunk_metadata[:seq]}]" | |
warn "Metadata size: [#{input_chunk_metadata[:s]}] (records)" | |
warn "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]" | |
warn "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]" | |
# //n switch means explicit 'ASCII-8BIT' pattern | |
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a | |
warn "Extracted state: #{state}" | |
warn "Extracted chunk_id: #{chunk_id}" | |
total_count = 0 | |
input_chunk_unpacker.each do |obj| | |
total_count += 1 | |
unless obj.is_a? Array | |
bad_record("#{total_count} Record is not an Array", obj) | |
next | |
end | |
unless obj.count == 2 | |
bad_record("#{total_count} Record is wrong size", obj) | |
next | |
end | |
unless obj.first.is_a? Numeric | |
bad_record("#{total_count} Record timestamp is not numeric", obj) | |
next | |
end | |
unless obj.first > 30 * 365.25 * 86400 | |
bad_record("#{total_count} Record timestamp is unusually old", obj) | |
next | |
end | |
unless obj.first < Time.now.to_i | |
bad_record("#{total_count} Record timestamp is in the future", obj) | |
next | |
end | |
unless obj.last.is_a? Hash | |
bad_record("#{total_count} Record data is not a Hash", obj) | |
next | |
end | |
unless obj.last.keys.all? { |k| k.is_a? String } | |
bad_record("#{total_count} Record data indexes are not all Strings", obj) | |
next | |
end | |
unless obj.last.values.all? { |v| v.is_a?(String) || v.is_a?(Numeric) || v.nil? } | |
# TODO: Detect if it looks like another record is buried inside and maybe write | |
# that to a third file for re-analysis and reinclusion? | |
bad_record("#{total_count} Record data values are not all valid types", obj) | |
next | |
end | |
# TODO: If "rewriting" then I'd write the valid record to the "good file" here. | |
end | |
warn "Total records: #{total_count}" | |
input_chunk_io.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment