Skip to content

Instantly share code, notes, and snippets.

@nirvdrum
Last active January 2, 2016 23:59
Show Gist options
  • Select an option

  • Save nirvdrum/8379471 to your computer and use it in GitHub Desktop.

Select an option

Save nirvdrum/8379471 to your computer and use it in GitHub Desktop.
Various redis connection methods of consuming 150,000 simple JSON representations of a Sidekiq job. Basically the intent is to get a rough idea of the overhead in queuing and processing 150,000 Sidekiq jobs using both basic and reliable fetch, compared to what you'd get if you had a batch job that just consumed a redis list. No attempt is made h…
def element_generator(count)
['{"class": "LocationUpdateWorker", "queue": "default", "args": {} }'] * count
end
def batched(element_count, batch_size)
Sidekiq.redis do |r|
r.del(:list_test)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
(element_count / batch_size).times do
elements = r.pipelined do
r.lrange(:list_test, 0, batch_size - 1)
r.ltrim(:list_test, batch_size, -1)
end.first
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
end
end
def serial(element_count)
Sidekiq.redis do |r|
r.del(:list_test)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
element_count.times do
r.blpop(:list_test)
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
end
end
def serial_non_blocking(element_count)
Sidekiq.redis do |r|
r.del(:list_test)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
element_count.times do
r.lpop(:list_test)
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
end
end
def serial_plus_decode(element_count)
Sidekiq.redis do |r|
r.del(:list_test)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
element_count.times do
Sidekiq.load_json(r.blpop(:list_test).last)
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
end
end
def pipelined_non_blocking(element_count, batch_size)
Sidekiq.redis do |r|
r.del(:list_test)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
(element_count / batch_size).times do
elements = r.pipelined do
batch_size.times do
r.lpop(:list_test)
end
end
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
end
end
def serial_reliable(element_count)
Sidekiq.redis do |r|
r.del(:list_test)
r.del(:list_test_reliable)
r.rpush(:list_test, element_generator(element_count))
start = Time.now
element_count.times do
val = r.rpoplpush(:list_test, :list_test_reliable)
r.lrem(:list_test_reliable, -1, val)
end
puts "Total batch time: #{Time.now - start}s"
r.del(:list_test)
r.del(:list_test_reliable)
end
end
serial_reliable(150_000) # => 153.644s <- Reliable fetching
serial(150_000) # => 85.445s <- Basic fetching
serial_plus_decode(150_000) # => 91.623s <- Basic fetching with JSON decode
# This set of calls uses redis pipelining, but just duplicate the lpop calls.
# I.e., N calls will be made to fetch N elements and the response body will
# an array of size N.
#
# lpop is O(1), which we call N times.
serial_non_blocking(150_000) # => 72.298s <- Can't pipeline with blpop; new baseline established with lpop
pipelined_non_blocking(150_000, 5) # => 30.719s
pipelined_non_blocking(150_000, 10) # => 17.947s
pipelined_non_blocking(150_000, 20) # => 5.914s
# This set of calls uses redis pipelining with only two commands: lrange and ltrim.
# The response body will be of an array of size 2. The first element will be an array
# of size N containing the fetched elements. The second element will be "OK" to
# acknowledge the delete.
#
# lrange is O(N), since we start a position 0.
# ltrim is O(N).
batched(150_000, 5) # => 29.741s
batched(150_000, 10) # => 15.124s
batched(150_000, 20) # => 7.24s
batched(150_000, 50) # => 3.442s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment