Skip to content

Instantly share code, notes, and snippets.

@adammw
Last active July 25, 2018 01:56
Show Gist options
  • Save adammw/27b7a3f236cb8fbbea8e1b3a4907225e to your computer and use it in GitHub Desktop.
Save adammw/27b7a3f236cb8fbbea8e1b3a4907225e to your computer and use it in GitHub Desktop.
require 'aws-sdk-kinesis'
require 'fluent/output'
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/kinesis'
require 'fluent/plugin/kinesis_helper/aggregator'
require 'fluent/plugin/kinesis_helper/api'
require 'fluent/plugin/kinesis_helper/client'
require 'pry'
module Fluent
class CustomKinesisOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('custom_kinesis', self)
# don't include these as they cause it to break
# include Fluent::SetTimeKeyMixin
# include Fluent::SetTagKeyMixin
#
include Fluent::MessagePackFactory::Mixin
include Fluent::KinesisHelper::Client
include Fluent::KinesisHelper::API
include Fluent::KinesisHelper::API::BatchRequest
include Fluent::KinesisHelper::Aggregator::Mixin
RequestType = :streams_aggregated
BatchRequestLimitCount = 100_000
BatchRequestLimitSize = 1024 * 1024
SkipRecordError = Fluent::KinesisOutput::SkipRecordError
KeyNotFoundError = Fluent::KinesisOutput::KeyNotFoundError
ExceedMaxRecordSizeError = Fluent::KinesisOutput::ExceedMaxRecordSizeError
InvalidRecordError = Fluent::KinesisOutput::InvalidRecordError
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil
config_param :stream_name, :string
config_param :fixed_partition_key, :string, default: nil
config_section :format do
config_set_default :@type, 'json'
end
config_section :inject do
config_set_default :time_type, 'string'
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z'
end
config_param :debug, :bool, default: false
helpers :formatter, :inject
def configure(conf)
super
@partition_key_generator = create_partition_key_generator
@batch_request_max_size -= offset
@max_record_size -= offset
@data_formatter = data_formatter_create(conf)
end
def format(tag, time, record)
format_for_api do
[@data_formatter.call(tag, time, record)]
end
end
def write(chunk)
write_records_batch(chunk) do |batch|
key = @partition_key_generator.call
records = batch.map{|(data)|data}
client.put_records(
stream_name: @stream_name,
records: [{
partition_key: key,
data: aggregator.aggregate(records, key),
}],
)
end
rescue StandardError => e
puts $driver.logs
puts e
puts e.backtrace
binding.pry
end
def offset
@offset ||= AggregateOffset + @partition_key_generator.call.size*2
end
def multi_workers_ready?
true
end
private
def size_of_values(record)
super(record) + RecordOffset
end
def create_partition_key_generator
if @fixed_partition_key.nil?
->() { SecureRandom.hex(16) }
else
->() { @fixed_partition_key }
end
end
def data_formatter_create(conf)
formatter = formatter_create
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
end
end
def compressor_create
case @compression
when "zlib"
->(data) { Zlib::Deflate.deflate(data) }
else
->(data) { data }
end
end
def format_for_api(&block)
converted = block.call
size = size_of_values(converted)
if size > @max_record_size
raise ExceedMaxRecordSizeError.new(size, converted)
end
converted.to_msgpack
rescue SkipRecordError => e
log.error(truncate e)
''
end
def write_records_batch(chunk, &block)
unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
chunk.open do |io|
begin
records = msgpack_unpacker(io).to_enum
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
rescue StandardError => e
puts $driver.logs
puts e
puts e.backtrace
binding.pry
end
end
end
def request_type
self.class::RequestType
end
def truncate(msg)
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
msg.to_s
else
msg.to_s[0...@log_truncate_max_size]
end
end
end
end
$driver = Fluent::Test::Driver::Output.new(Fluent::CustomKinesisOutput).configure <<~CONF
log_level debug
region us-west-1
stream_name dummy
aws_key_id abcdef123
aws_sec_key abcdef123
<buffer>
chunk_limit_size "1m"
</buffer>
CONF
Aws::Kinesis::Client.prepend(Module.new do
def put_records(*args)
OpenStruct.new(
encryption_type: "KMS",
failed_record_count: 0,
records: [
OpenStruct.new(
sequence_number: "12345",
shard_id: "12345"
)
]
)
end
end)
$driver.run(force_flush_retry: true) do
10.times do
time = Fluent::EventTime.now
events = Array.new(Kernel.rand(3000..5000)).enum_for(:each_with_index).map { |_,i| [time, { msg: ('a'.ord + i % 26).chr * 256 }] }
$driver.feed("my.tag", events)
end
end
puts $driver.logs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment