Last active
March 10, 2020 15:46
-
-
Save brycied00d/6c262aa821aca811d7c074b20cf5198c to your computer and use it in GitHub Desktop.
fluentd-buffer-split.rb - Split a single large fluentd buffer chunk (of type "file") into smaller pieces that can be manually reintroduced gradually in order to reduce load on processing pipelines.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Quick and dirty script to split a fluentd buffer chunk into smaller chunks. | |
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.) | |
# Copyright 2020 Bryce Chidester <[email protected]> | |
require 'msgpack' | |
require 'time' | |
NEW_CHUNK_SIZE = 100_000 | |
def flush_new_buffer(state, chunk_id, data) | |
prefix = 'NEW_buffer' | |
suffix = 'log' | |
path = "#{prefix}.#{state}#{chunk_id}.#{suffix}" | |
output_chunk_io = File.open(path, 'wb') | |
output_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
output_chunk_io.binmode | |
output_chunk_io.sync = true | |
output_chunk_packer = MessagePack::Packer.new(output_chunk_io) | |
data.each { |obj| output_chunk_packer.write(obj) } | |
output_chunk_packer.flush | |
output_chunk_io.close | |
path | |
end | |
def generate_unique_id | |
now = Time.now.utc | |
u1 = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff)) | |
[u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN') | |
end | |
def unique_id_to_str(unique_id) | |
unique_id.unpack('H*').first | |
end | |
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze | |
def parse_metadata(in_file) | |
meta_chunk_io = File.open(in_file, 'rb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.sync = true | |
meta_chunk_io.binmode | |
metadata = meta_chunk_io.read | |
puts "Metadata file size: #{metadata.size}" | |
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4) | |
puts "Failed to parse metadata file: #{in_file}" | |
return nil | |
end | |
if metadata.slice(0, 2) == BUFFER_HEADER | |
size = metadata.slice(2, 4).unpack('N').first | |
puts "Metadata chunk size: #{size}" | |
if size | |
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true) | |
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read | |
end | |
end | |
nil | |
end | |
def write_metadata(out_file, metadata, unique_id, size) | |
new_metadata = metadata.merge(id: unique_id, s: size) | |
# puts "Input metadata: #{metadata}" | |
# puts "New metadata: #{new_metadata}" | |
meta_chunk_packer = MessagePack::Packer.new() | |
bin = meta_chunk_packer.pack(new_metadata).full_pack | |
meta_chunk_io = File.open(out_file, 'wb') | |
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
meta_chunk_io.binmode | |
meta_chunk_io.sync = true | |
meta_chunk_io.write(BUFFER_HEADER + ([bin.bytesize].pack('N')) + bin) | |
meta_chunk_io.close | |
end | |
in_file = ARGV.first | |
input_chunk_io = File.open(in_file, 'rb') | |
input_chunk_io.set_encoding(Encoding::ASCII_8BIT) | |
input_chunk_io.binmode | |
input_chunk_io.sync = true | |
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io) | |
input_chunk_metadata = parse_metadata("#{in_file}.meta") | |
puts "Metadata timekey: [#{input_chunk_metadata[:timekey]}]" | |
puts "Metadata tag: [#{input_chunk_metadata[:tag]}]" | |
puts "Metadata variables: [#{input_chunk_metadata[:variables]}]" | |
puts "Metadata sequence: [#{input_chunk_metadata[:seq]}]" | |
puts "Metadata size: [#{input_chunk_metadata[:s]}] (records)" | |
puts "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]" | |
puts "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]" | |
# //n switch means explicit 'ASCII-8BIT' pattern | |
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a | |
puts "Extracted state: #{state}" | |
puts "Extracted chunk_id: #{chunk_id}" | |
new_buffer = [] | |
total_count = 0 | |
flushed_count = 0 | |
input_chunk_unpacker.each do |obj| | |
total_count += 1 | |
new_buffer << obj | |
next unless new_buffer.size >= NEW_CHUNK_SIZE | |
new_unique_id = generate_unique_id | |
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), new_buffer) | |
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, new_buffer.size) | |
puts "Writing new chunk: #{path}" | |
flushed_count += new_buffer.size | |
new_buffer = [] | |
end | |
# Perform one last flush | |
unless new_buffer.size.zero? | |
new_unique_id = generate_unique_id | |
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), new_buffer) | |
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, new_buffer.size) | |
puts "Writing final chunk: #{path}" | |
flushed_count += new_buffer.size | |
end | |
puts "Total records: #{total_count}" | |
puts "Flushed records: #{flushed_count}" | |
input_chunk_io.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment