Last active
July 25, 2018 01:56
-
-
Save adammw/27b7a3f236cb8fbbea8e1b3a4907225e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'aws-sdk-kinesis' | |
require 'fluent/output' | |
require 'fluent/test' | |
require 'fluent/test/driver/output' | |
require 'fluent/plugin/kinesis' | |
require 'fluent/plugin/kinesis_helper/aggregator' | |
require 'fluent/plugin/kinesis_helper/api' | |
require 'fluent/plugin/kinesis_helper/client' | |
require 'pry' | |
module Fluent | |
class CustomKinesisOutput < Fluent::Plugin::Output | |
Fluent::Plugin.register_output('custom_kinesis', self) | |
# don't include these as they cause it to break | |
# include Fluent::SetTimeKeyMixin | |
# include Fluent::SetTagKeyMixin | |
# | |
include Fluent::MessagePackFactory::Mixin | |
include Fluent::KinesisHelper::Client | |
include Fluent::KinesisHelper::API | |
include Fluent::KinesisHelper::API::BatchRequest | |
include Fluent::KinesisHelper::Aggregator::Mixin | |
RequestType = :streams_aggregated | |
BatchRequestLimitCount = 100_000 | |
BatchRequestLimitSize = 1024 * 1024 | |
SkipRecordError = Fluent::KinesisOutput::SkipRecordError | |
KeyNotFoundError = Fluent::KinesisOutput::KeyNotFoundError | |
ExceedMaxRecordSizeError = Fluent::KinesisOutput::ExceedMaxRecordSizeError | |
InvalidRecordError = Fluent::KinesisOutput::InvalidRecordError | |
config_param :data_key, :string, default: nil | |
config_param :log_truncate_max_size, :integer, default: 1024 | |
config_param :compression, :string, default: nil | |
config_param :stream_name, :string | |
config_param :fixed_partition_key, :string, default: nil | |
config_section :format do | |
config_set_default :@type, 'json' | |
end | |
config_section :inject do | |
config_set_default :time_type, 'string' | |
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z' | |
end | |
config_param :debug, :bool, default: false | |
helpers :formatter, :inject | |
def configure(conf) | |
super | |
@partition_key_generator = create_partition_key_generator | |
@batch_request_max_size -= offset | |
@max_record_size -= offset | |
@data_formatter = data_formatter_create(conf) | |
end | |
def format(tag, time, record) | |
format_for_api do | |
[@data_formatter.call(tag, time, record)] | |
end | |
end | |
def write(chunk) | |
write_records_batch(chunk) do |batch| | |
key = @partition_key_generator.call | |
records = batch.map{|(data)|data} | |
client.put_records( | |
stream_name: @stream_name, | |
records: [{ | |
partition_key: key, | |
data: aggregator.aggregate(records, key), | |
}], | |
) | |
end | |
rescue StandardError => e | |
puts $driver.logs | |
puts e | |
puts e.backtrace | |
binding.pry | |
end | |
def offset | |
@offset ||= AggregateOffset + @partition_key_generator.call.size*2 | |
end | |
def multi_workers_ready? | |
true | |
end | |
private | |
def size_of_values(record) | |
super(record) + RecordOffset | |
end | |
def create_partition_key_generator | |
if @fixed_partition_key.nil? | |
->() { SecureRandom.hex(16) } | |
else | |
->() { @fixed_partition_key } | |
end | |
end | |
def data_formatter_create(conf) | |
formatter = formatter_create | |
compressor = compressor_create | |
if @data_key.nil? | |
->(tag, time, record) { | |
record = inject_values_to_record(tag, time, record) | |
compressor.call(formatter.format(tag, time, record).chomp.b) | |
} | |
else | |
->(tag, time, record) { | |
raise InvalidRecordError, record unless record.is_a? Hash | |
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil? | |
compressor.call(record[@data_key].to_s.b) | |
} | |
end | |
end | |
def compressor_create | |
case @compression | |
when "zlib" | |
->(data) { Zlib::Deflate.deflate(data) } | |
else | |
->(data) { data } | |
end | |
end | |
def format_for_api(&block) | |
converted = block.call | |
size = size_of_values(converted) | |
if size > @max_record_size | |
raise ExceedMaxRecordSizeError.new(size, converted) | |
end | |
converted.to_msgpack | |
rescue SkipRecordError => e | |
log.error(truncate e) | |
'' | |
end | |
def write_records_batch(chunk, &block) | |
unique_id = chunk.dump_unique_id_hex(chunk.unique_id) | |
chunk.open do |io| | |
begin | |
records = msgpack_unpacker(io).to_enum | |
split_to_batches(records) do |batch, size| | |
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024) | |
batch_request_with_retry(batch, &block) | |
log.debug("Finish writing chunk") | |
end | |
rescue StandardError => e | |
puts $driver.logs | |
puts e | |
puts e.backtrace | |
binding.pry | |
end | |
end | |
end | |
def request_type | |
self.class::RequestType | |
end | |
def truncate(msg) | |
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size) | |
msg.to_s | |
else | |
msg.to_s[0...@log_truncate_max_size] | |
end | |
end | |
end | |
end | |
$driver = Fluent::Test::Driver::Output.new(Fluent::CustomKinesisOutput).configure <<~CONF | |
log_level debug | |
region us-west-1 | |
stream_name dummy | |
aws_key_id abcdef123 | |
aws_sec_key abcdef123 | |
<buffer> | |
chunk_limit_size "1m" | |
</buffer> | |
CONF | |
Aws::Kinesis::Client.prepend(Module.new do | |
def put_records(*args) | |
OpenStruct.new( | |
encryption_type: "KMS", | |
failed_record_count: 0, | |
records: [ | |
OpenStruct.new( | |
sequence_number: "12345", | |
shard_id: "12345" | |
) | |
] | |
) | |
end | |
end) | |
$driver.run(force_flush_retry: true) do | |
10.times do | |
time = Fluent::EventTime.now | |
events = Array.new(Kernel.rand(3000..5000)).enum_for(:each_with_index).map { |_,i| [time, { msg: ('a'.ord + i % 26).chr * 256 }] } | |
$driver.feed("my.tag", events) | |
end | |
end | |
puts $driver.logs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment