Created
July 9, 2020 23:20
-
-
Save brycied00d/bd3cab938b987fad5a9bd219364c0154 to your computer and use it in GitHub Desktop.
fluentd-buffer-cleaner.rb - Validate each buffered record against some basic principles, dumping good and "bad"/"corrupt" records to separate buffers. (This is v2 of fluentd-buffer-check.rb)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Quick and dirty script to split a fluentd buffer chunk into "good" and "bad" buffers. | |
# Uses the validation logic from fluentd-buffer-check.rb (https://gist.github.com/brycied00d/70c53104d780835d2d48610b7a783c8c) | |
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.) | |
# Copyright 2020 Bryce Chidester <[email protected]> | |
require 'msgpack' | |
require 'time' | |
def flush_new_buffer(state, chunk_id, data, prefix = 'NEW_buffer', suffix = 'log') | |
path = "#{prefix}.#{state}#{chunk_id}.#{suffix}" | |
output_chunk_io = File.open(path, 'wb') | |
output_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
output_chunk_io.binmode | |
output_chunk_io.sync = true | |
output_chunk_packer = MessagePack::Packer.new(output_chunk_io) | |
data.each { |obj| output_chunk_packer.write(obj) } | |
output_chunk_packer.flush | |
output_chunk_io.close | |
path | |
end | |
def generate_unique_id | |
now = Time.now.utc | |
u1 = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff)) | |
[u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN') | |
end | |
def unique_id_to_str(unique_id) | |
unique_id.unpack('H*').first | |
end | |
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze | |
def parse_metadata(in_file) | |
meta_chunk_io = File.open(in_file, 'rb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.sync = true | |
meta_chunk_io.binmode | |
metadata = meta_chunk_io.read | |
puts "Metadata file size: #{metadata.size}" | |
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4) | |
puts "Failed to parse metadata file: #{in_file}" | |
return nil | |
end | |
if metadata.slice(0, 2) == BUFFER_HEADER | |
size = metadata.slice(2, 4).unpack('N').first | |
puts "Metadata chunk size: #{size}" | |
if size | |
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true) | |
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read | |
end | |
end | |
nil | |
end | |
def write_metadata(out_file, metadata, unique_id, size) | |
new_metadata = metadata.merge(id: unique_id, s: size) | |
# puts "Input metadata: #{metadata}" | |
# puts "New metadata: #{new_metadata}" | |
meta_chunk_packer = MessagePack::Packer.new() | |
bin = meta_chunk_packer.pack(new_metadata).full_pack | |
meta_chunk_io = File.open(out_file, 'wb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.binmode | |
meta_chunk_io.sync = true | |
meta_chunk_io.write(BUFFER_HEADER + ([bin.bytesize].pack('N')) + bin) | |
meta_chunk_io.close | |
end | |
# Simple validation of a given record object. Returns a string upon error, or | |
# nil if no error. | |
def validate_record(obj) | |
return "Record is not an Array" unless obj.is_a? Array | |
return "Record is wrong size" unless obj.count == 2 | |
unless obj.first.is_a? Numeric | |
return "Record timestamp is not numeric" | |
end | |
unless obj.first > 30 * 365.25 * 86400 | |
return "Record timestamp is unusually old" | |
end | |
unless obj.first < Time.now.to_i | |
return "Record timestamp is in the future" | |
end | |
return "Record data is not a Hash" unless obj.last.is_a? Hash | |
unless obj.last.keys.all? { |k| k.is_a? String } | |
return "Record data indexes are not all Strings" | |
end | |
unless obj.last.values.all? { |v| v.is_a?(String) || v.is_a?(Numeric) || v.nil? } | |
# TODO: Detect if it looks like another record is buried inside and maybe write | |
# that to a third file for re-analysis and reinclusion? | |
return "Record data values are not all valid types" | |
end | |
end | |
in_file = ARGV.first | |
input_chunk_io = File.open(in_file, 'rb') | |
input_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
input_chunk_io.binmode | |
input_chunk_io.sync = true | |
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io) | |
input_chunk_metadata = parse_metadata("#{in_file}.meta") | |
puts "Metadata timekey: [#{input_chunk_metadata[:timekey]}]" | |
puts "Metadata tag: [#{input_chunk_metadata[:tag]}]" | |
puts "Metadata variables: [#{input_chunk_metadata[:variables]}]" | |
puts "Metadata sequence: [#{input_chunk_metadata[:seq]}]" | |
puts "Metadata size: [#{input_chunk_metadata[:s]}] (records)" | |
puts "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]" | |
puts "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]" | |
# //n switch means explicit 'ASCII-8BIT' pattern | |
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a | |
puts "Extracted state: #{state}" | |
puts "Extracted chunk_id: #{chunk_id}" | |
good_buffer = [] | |
bad_buffer = [] | |
total_count = 0 | |
flushed_count = 0 | |
input_chunk_unpacker.each do |obj| | |
total_count += 1 | |
err = validate_record(obj) | |
if err | |
puts "#{total_count} #{err}" | |
pp obj | |
bad_buffer << obj | |
next | |
end | |
# If we got this far, it's a "good" object. | |
good_buffer << obj | |
end | |
unless good_buffer.size.zero? | |
new_unique_id = generate_unique_id | |
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), good_buffer, 'good_buffer') | |
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, good_buffer.size) | |
puts "Writing good_buffer: #{path}" | |
flushed_count += good_buffer.size | |
end | |
unless bad_buffer.size.zero? | |
new_unique_id = generate_unique_id | |
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), bad_buffer, 'bad_buffer') | |
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, bad_buffer.size) | |
puts "Writing bad_buffer: #{path}" | |
flushed_count += bad_buffer.size | |
end | |
puts "Total records: #{total_count}" | |
puts "Flushed records: #{flushed_count}" | |
input_chunk_io.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment