Skip to content

Instantly share code, notes, and snippets.

@bbozo
Last active April 12, 2017 17:58
Show Gist options
  • Save bbozo/d75f505247341081afd753f01cd53270 to your computer and use it in GitHub Desktop.
Save bbozo/d75f505247341081afd753f01cd53270 to your computer and use it in GitHub Desktop.
sidekiq-scheduler race condition proof-of-concept
# gem install concurrent-ruby redis pry connection_pool
require "redis"
require "pry"
require "connection_pool"
require "concurrent"
require "securerandom"
$cp = ConnectionPool.new { Redis.new }
REGISTERED_JOBS_THRESHOLD_IN_SECONDS = 60 * 60
$salt = SecureRandom.base64
def run_a_setnx job_key
Concurrent::Future.execute do
time = Time.now
pushed_job_key = "sidekiq-scheduler/#{$salt}/setnx_test_#{job_key}"
$cp.with do |r|
r.pipelined do
r.setnx(pushed_job_key, time.to_i)
r.expire(pushed_job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
end
end
end
end
def run_a_setnx_race_condition job_key
futures = 100.times.map do
run_a_setnx job_key
end
rv = futures.map{|f| f.value!.first}.select{|v| v == true }
raise "got #{rv.count} results for job_key #{job_key}" if rv.count != 1
rv
end
1000.times do |idx|
run_a_setnx_race_condition idx
end
$cp.shutdown { |conn| conn.quit }
# gem install concurrent-ruby redis pry connection_pool
require "redis"
require "pry"
require "connection_pool"
require "concurrent"
$cp = ConnectionPool.new { Redis.new }
REGISTERED_JOBS_THRESHOLD_IN_SECONDS = 24 * 60 * 60
def run_a_zadd job_key
rv = Concurrent::Future.execute do
time = Time.now
pushed_job_key = "sidekiq-scheduler/zadd_test_#{job_key}"
$cp.with do |r|
r.pipelined do
r.zadd(pushed_job_key, time.to_i, time.to_i)
r.expire(pushed_job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
end
end
end
rv
end
def run_a_zadd_race_condition job_key
futures = 100.times.map do
run_a_zadd job_key
end
rv = futures.map{|f| f.value!.first}.select{|v| v == true }
raise "got #{rv.count} results for job_key #{job_key}" if rv.count != 1
rv
end
1000.times do |idx|
run_a_zadd_race_condition idx
end
$cp.shutdown { |conn| conn.quit }
# gem install concurrent-ruby redis pry connection_pool rufus-scheduler
# ruby zadd_sidekiq_scheduler.rb
require 'concurrent'
require 'connection_pool'
require 'pry'
require 'redis'
require 'rufus-scheduler'
$cp = ConnectionPool.new { Redis.new }
$t0 = nil
$t1 = nil
REGISTERED_JOBS_THRESHOLD_IN_SECONDS = 24 * 60 * 60
PUSHED_JOBS_SET_KEY = 'sidekiq-scheduler:pushed:some-job'
PUSHED_JOBS_COUNTER_KEY = 'sidekiq-scheduler:pushed:counter'
CRON_STRING = '*/6 * * * * *'
def cleanup
$cp.with do |r|
r.del(PUSHED_JOBS_SET_KEY, PUSHED_JOBS_COUNTER_KEY)
end
end
def idempotent_job_enqueue(time)
$cp.with do |r|
registered, _ = r.pipelined do
r.zadd(PUSHED_JOBS_SET_KEY, time.to_i, time.to_i)
r.expire(PUSHED_JOBS_SET_KEY, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
end
if registered
r.incr(PUSHED_JOBS_COUNTER_KEY)
printf "_"
else
printf "."
end
end
end
def report_start
puts "version: #{RUBY_VERSION}"
puts "engine: #{RUBY_ENGINE}"
puts "platform: #{RUBY_PLATFORM}"
puts ""
puts "cron string: #{CRON_STRING}"
puts 'runs when sec is 00, 06, 12, 18, 24, 30, 36, 42, 48, 54'
puts ''
puts "Started at: #{$t0}"
end
def report_end
pushed_jobs_in_set, counter = $cp.with do |r|
r.pipelined do
r.zcard(PUSHED_JOBS_SET_KEY)
r.get(PUSHED_JOBS_COUNTER_KEY)
end
end
puts "Finished at: #{$t1}"
puts ""
puts "Timestamps count in #{PUSHED_JOBS_SET_KEY} = #{pushed_jobs_in_set}"
puts "Counter (#{PUSHED_JOBS_COUNTER_KEY}) = #{counter}"
end
cleanup
$t0 = Time.now
$run_time = $t0 + 30
report_start
futures = 100.times.map do
Process.fork do
$cp = ConnectionPool.new { Redis.new }
scheduler = Rufus::Scheduler.new(frequency: 0.1)
sleep($run_time-Time.now)
scheduler.every '0.1s' do |job, time|
idempotent_job_enqueue(time)
end
sleep(63)
scheduler.shutdown(:wait)
printf "!"
end
end
Process.wait
at_exit do
$t1 = Time.now
report_end
$cp.shutdown { |conn| conn.quit }
puts 'bye!'
end
@bbozo
Copy link
Author

bbozo commented Apr 12, 2017

And the output:

bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
zadd_test.rb:30:in `run_a_zadd_race_condition': got 2 results for job_key 44 (RuntimeError)
        from zadd_test.rb:35:in `block in <main>'
        from zadd_test.rb:34:in `times'
        from zadd_test.rb:34:in `<main>'
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
zadd_test.rb:30:in `run_a_zadd_race_condition': got 2 results for job_key 158 (RuntimeError)
        from zadd_test.rb:35:in `block in <main>'
        from zadd_test.rb:34:in `times'
        from zadd_test.rb:34:in `<main>'
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
zadd_test.rb:30:in `run_a_zadd_race_condition': got 2 results for job_key 68 (RuntimeError)
        from zadd_test.rb:35:in `block in <main>'
        from zadd_test.rb:34:in `times'
        from zadd_test.rb:34:in `<main>'
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
zadd_test.rb:30:in `run_a_zadd_race_condition': got 2 results for job_key 15 (RuntimeError)
        from zadd_test.rb:35:in `block in <main>'
        from zadd_test.rb:34:in `times'
        from zadd_test.rb:34:in `<main>'
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
zadd_test.rb:30:in `run_a_zadd_race_condition': got 2 results for job_key 442 (RuntimeError)
        from zadd_test.rb:35:in `block in <main>'
        from zadd_test.rb:34:in `times'
        from zadd_test.rb:34:in `<main>'

Notice also that the results are much more severe on a Ruby that doesn't have a GIL and supports proper threading:

bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
RuntimeError: got 2 results for job_key 3
  run_a_zadd_race_condition at zadd_test.rb:30
      block in zadd_test.rb at zadd_test.rb:35
                      times at org/jruby/RubyFixnum.java:302
                      <top> at zadd_test.rb:34
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
RuntimeError: got 2 results for job_key 2
  run_a_zadd_race_condition at zadd_test.rb:30
      block in zadd_test.rb at zadd_test.rb:35
                      times at org/jruby/RubyFixnum.java:302
                      <top> at zadd_test.rb:34
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
RuntimeError: got 2 results for job_key 3
  run_a_zadd_race_condition at zadd_test.rb:30
      block in zadd_test.rb at zadd_test.rb:35
                      times at org/jruby/RubyFixnum.java:302
                      <top> at zadd_test.rb:34
bbozo@eva:~/dev/redis_zadd_test$ ruby zadd_test.rb 
RuntimeError: got 2 results for job_key 18
  run_a_zadd_race_condition at zadd_test.rb:30
      block in zadd_test.rb at zadd_test.rb:35
                      times at org/jruby/RubyFixnum.java:302
                      <top> at zadd_test.rb:34

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment