Skip to content

Instantly share code, notes, and snippets.

@brycied00d
Created July 9, 2020 23:20
Show Gist options
  • Save brycied00d/bd3cab938b987fad5a9bd219364c0154 to your computer and use it in GitHub Desktop.
Save brycied00d/bd3cab938b987fad5a9bd219364c0154 to your computer and use it in GitHub Desktop.
fluentd-buffer-cleaner.rb - Validate each buffered record against some basic principles, dumping good and "bad"/"corrupt" records to separate buffers. (This is v2 of fluentd-buffer-check.rb)
# Quick and dirty script to split a fluentd buffer chunk into "good" and "bad" buffers.
# Uses the validation logic from fluentd-buffer-check.rb (https://gist.github.com/brycied00d/70c53104d780835d2d48610b7a783c8c)
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.)
# Copyright 2020 Bryce Chidester <[email protected]>
require 'msgpack'
require 'time'
def flush_new_buffer(state, chunk_id, data, prefix = 'NEW_buffer', suffix = 'log')
path = "#{prefix}.#{state}#{chunk_id}.#{suffix}"
output_chunk_io = File.open(path, 'wb')
output_chunk_io.set_encoding(Encoding::ASCII_8BIT)
output_chunk_io.binmode
output_chunk_io.sync = true
output_chunk_packer = MessagePack::Packer.new(output_chunk_io)
data.each { |obj| output_chunk_packer.write(obj) }
output_chunk_packer.flush
output_chunk_io.close
path
end
def generate_unique_id
now = Time.now.utc
u1 = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff))
[u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN')
end
def unique_id_to_str(unique_id)
unique_id.unpack('H*').first
end
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze
def parse_metadata(in_file)
meta_chunk_io = File.open(in_file, 'rb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.sync = true
meta_chunk_io.binmode
metadata = meta_chunk_io.read
puts "Metadata file size: #{metadata.size}"
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
puts "Failed to parse metadata file: #{in_file}"
return nil
end
if metadata.slice(0, 2) == BUFFER_HEADER
size = metadata.slice(2, 4).unpack('N').first
puts "Metadata chunk size: #{size}"
if size
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true)
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read
end
end
nil
end
def write_metadata(out_file, metadata, unique_id, size)
new_metadata = metadata.merge(id: unique_id, s: size)
# puts "Input metadata: #{metadata}"
# puts "New metadata: #{new_metadata}"
meta_chunk_packer = MessagePack::Packer.new()
bin = meta_chunk_packer.pack(new_metadata).full_pack
meta_chunk_io = File.open(out_file, 'wb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.binmode
meta_chunk_io.sync = true
meta_chunk_io.write(BUFFER_HEADER + ([bin.bytesize].pack('N')) + bin)
meta_chunk_io.close
end
# Simple validation of a given record object. Returns a string upon error, or
# nil if no error.
def validate_record(obj)
return "Record is not an Array" unless obj.is_a? Array
return "Record is wrong size" unless obj.count == 2
unless obj.first.is_a? Numeric
return "Record timestamp is not numeric"
end
unless obj.first > 30 * 365.25 * 86400
return "Record timestamp is unusually old"
end
unless obj.first < Time.now.to_i
return "Record timestamp is in the future"
end
return "Record data is not a Hash" unless obj.last.is_a? Hash
unless obj.last.keys.all? { |k| k.is_a? String }
return "Record data indexes are not all Strings"
end
unless obj.last.values.all? { |v| v.is_a?(String) || v.is_a?(Numeric) || v.nil? }
# TODO: Detect if it looks like another record is buried inside and maybe write
# that to a third file for re-analysis and reinclusion?
return "Record data values are not all valid types"
end
end
in_file = ARGV.first
input_chunk_io = File.open(in_file, 'rb')
input_chunk_io.set_encoding(Encoding::ASCII_8BIT)
input_chunk_io.binmode
input_chunk_io.sync = true
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io)
input_chunk_metadata = parse_metadata("#{in_file}.meta")
puts "Metadata timekey: [#{input_chunk_metadata[:timekey]}]"
puts "Metadata tag: [#{input_chunk_metadata[:tag]}]"
puts "Metadata variables: [#{input_chunk_metadata[:variables]}]"
puts "Metadata sequence: [#{input_chunk_metadata[:seq]}]"
puts "Metadata size: [#{input_chunk_metadata[:s]}] (records)"
puts "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]"
puts "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]"
# //n switch means explicit 'ASCII-8BIT' pattern
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a
puts "Extracted state: #{state}"
puts "Extracted chunk_id: #{chunk_id}"
good_buffer = []
bad_buffer = []
total_count = 0
flushed_count = 0
input_chunk_unpacker.each do |obj|
total_count += 1
err = validate_record(obj)
if err
puts "#{total_count} #{err}"
pp obj
bad_buffer << obj
next
end
# If we got this far, it's a "good" object.
good_buffer << obj
end
unless good_buffer.size.zero?
new_unique_id = generate_unique_id
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), good_buffer, 'good_buffer')
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, good_buffer.size)
puts "Writing good_buffer: #{path}"
flushed_count += good_buffer.size
end
unless bad_buffer.size.zero?
new_unique_id = generate_unique_id
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), bad_buffer, 'bad_buffer')
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, bad_buffer.size)
puts "Writing bad_buffer: #{path}"
flushed_count += bad_buffer.size
end
puts "Total records: #{total_count}"
puts "Flushed records: #{flushed_count}"
input_chunk_io.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment