Created
April 12, 2017 21:16
-
-
Save colinsurprenant/b3ca535857a0e5d4ab48580fdfd653fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "logstash/util/wrapped_acked_queue" | |
require "logstash/event" | |
require "logstash/instrument/namespaced_metric" | |
RSpec.configure do |config| | |
config.backtrace_exclusion_patterns << /rakelib\/test/ | |
config.backtrace_exclusion_patterns << /vendor\/bundle/ | |
# config.order = :random | |
# config.seed = 24307 | |
end | |
describe LogStash::Util::WrappedAckedQueue do | |
let(:path) { Stud::Temporary.directory } | |
context "with multiple writers" do | |
let(:items) { expected_count / writers } | |
let(:page_capacity) { 1 << page_capacity_multiplier } | |
let(:queue_capacity) { page_capacity * queue_capacity_multiplier } | |
let(:output_strings) { [] } | |
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] } | |
let(:queue) do | |
described_class.create_file_based(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, queue_capacity) | |
end | |
let(:writer_threads) do | |
writer = queue.write_client | |
writers.times.map do |i| | |
Thread.new(i, items, writer) do |i, items, writer| | |
publisher(items, writer) | |
end | |
end | |
end | |
let(:writers_finished) { Concurrent::AtomicBoolean.new(false) } | |
let(:reader_threads) do | |
reader = queue.read_client | |
reader.set_batch_dimensions(batch_size, batch_wait) | |
reader.set_events_metric(metric.namespace([:stats, :events])) | |
reader.set_pipeline_metric(metric.namespace([:stats, :pipelines, :main, :events])) | |
readers.times.map do |i| | |
Thread.new(i, reader, counts) do |ii, readrrr, countss| | |
tally = 0 | |
while true | |
batch = readrrr.read_batch | |
break if batch.size.zero? && writers_finished.value == true && queue.queue.is_fully_acked? | |
if simulate_work | |
sleep rand * 0.01 | |
end | |
tally += batch.size | |
batch.close | |
end | |
countss[ii] = tally | |
# puts("reader #{ii}, tally=#{tally}, countss=#{countss.inspect}") | |
end | |
end | |
end | |
def publisher(items, writer) | |
items.times.each do |i| | |
event = LogStash::Event.new("sequence" => "#{i}".ljust(string_size)) | |
writer.push(event) | |
end | |
rescue => e | |
p :publisher_error => e | |
end | |
let(:collector) { LogStash::Instrument::Collector.new } | |
let(:metric) { LogStash::Instrument::Metric.new(collector) } | |
shared_examples "a well behaved queue" do | |
it "writes, reads, closes and reopens" do | |
Thread.abort_on_exception = true | |
# force lazy initialization to avoid concurency issues within threads | |
counts | |
# Start the threads | |
writer_threads | |
reader_threads | |
writer_threads.each(&:join) | |
writers_finished.make_true | |
reader_threads.each(&:join) | |
enqueued = queue.queue.unread_count | |
if enqueued != 0 | |
output_strings << "unread events in queue: #{enqueued}" | |
end | |
got = counts.reduce(&:+) | |
if got != expected_count | |
# puts("count=#{counts.inspect}") | |
output_strings << "events read: #{got}" | |
end | |
sleep 0.1 | |
expect { queue.close }.not_to raise_error | |
sleep 0.1 | |
files = Dir.glob(path + '/*').map{|f| f.sub("#{path}/", '')} | |
if files.count != 2 | |
output_strings << "File count after close mismatch expected: 2 got: #{files.count}" | |
output_strings.concat files | |
end | |
begin | |
queue.queue.open | |
rescue Exception => e | |
output_strings << e.message | |
end | |
queue.close | |
if output_strings.any? | |
output_strings << __memoized.reject{|k,v| reject_memo_keys.include?(k)}.inspect | |
end | |
expect(output_strings).to eq([]) | |
end | |
end | |
let(:writers) { 3 } | |
let(:readers) { 3 } | |
let(:simulate_work) { true } | |
let(:counts) { Concurrent::Array.new([0, 0, 0, 0, 0, 0, 0, 0]) } | |
let(:page_capacity_multiplier) { 20 } | |
let(:queue_capacity_multiplier) { 128 } | |
let(:queue_checkpoint_acks) { 1024 } | |
let(:queue_checkpoint_writes) { 1024 } | |
let(:queue_checkpoint_interval) { 1000 } | |
let(:batch_size) { 500 } | |
let(:batch_wait) { 1000 } | |
let(:expected_count) { 60000 } | |
let(:string_size) { 256 } | |
describe "with simulate_work ON" do | |
let(:simulate_work) { true } | |
context "> more writers than readers <" do | |
let(:writers) { 4 } | |
let(:readers) { 2 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> less writers than readers <" do | |
let(:writers) { 2 } | |
let(:readers) { 4 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint acks <" do | |
let(:queue_checkpoint_acks) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint acks <" do | |
let(:queue_checkpoint_acks) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint writes <" do | |
let(:queue_checkpoint_writes) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint writes <" do | |
let(:queue_checkpoint_writes) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint interval <" do | |
let(:queue_checkpoint_interval) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint interval <" do | |
let(:queue_checkpoint_interval) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller batch wait <" do | |
let(:batch_wait) { 125 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger batch wait <" do | |
let(:batch_wait) { 5000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller event size <" do | |
let(:string_size) { 8 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger event size <" do | |
let(:string_size) { 8192 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> small queue size limit <" do | |
let(:queue_capacity_multiplier) { 10 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> very large queue size limit <" do | |
let(:queue_capacity_multiplier) { 512 } | |
it_behaves_like "a well behaved queue" | |
end | |
end | |
describe "with simulate_work OFF" do | |
let(:simulate_work) { false } | |
context "> more writers than readers <" do | |
let(:writers) { 4 } | |
let(:readers) { 2 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> less writers than readers <" do | |
let(:writers) { 2 } | |
let(:readers) { 4 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint acks <" do | |
let(:queue_checkpoint_acks) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint acks <" do | |
let(:queue_checkpoint_acks) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint writes <" do | |
let(:queue_checkpoint_writes) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint writes <" do | |
let(:queue_checkpoint_writes) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger checkpoint interval <" do | |
let(:queue_checkpoint_interval) { 3000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller checkpoint interval <" do | |
let(:queue_checkpoint_interval) { 500 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller batch wait <" do | |
let(:batch_wait) { 125 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger batch wait <" do | |
let(:batch_wait) { 5000 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> smaller event size <" do | |
let(:string_size) { 8 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> larger event size <" do | |
let(:string_size) { 8192 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> small queue size limit <" do | |
let(:queue_capacity_multiplier) { 10 } | |
it_behaves_like "a well behaved queue" | |
end | |
context "> very large queue size limit <" do | |
let(:queue_capacity_multiplier) { 512 } | |
it_behaves_like "a well behaved queue" | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment