Skip to content

Instantly share code, notes, and snippets.

@ixti
Last active May 19, 2022 16:31
Show Gist options
  • Save ixti/d19b238ec8f288fce9a885a1ecea56fb to your computer and use it in GitHub Desktop.
Save ixti/d19b238ec8f288fce9a885a1ecea56fb to your computer and use it in GitHub Desktop.
Sidekiq: queue filtering
module SidekiqQueueFilterV3
class Observer
def initialize(frequency: 1000)
@frequency = frequency.to_i
@buffer = 0
@stats = Hash.new(0)
end
def call(event)
@buffer += 1
@stats[event] += 1
flush
end
def flush(force: false)
return if @buffer.zero?
return unless force || @frequency <= @buffer
puts @stats
@stats.clear
@buffer = 0
end
end
class Drainer
SCRIPT = <<~LUA
local src = KEYS[1]
local dst = KEYS[2]
local tmp = src .. ".rejected"
local payload = redis.call("LPOP", src)
if not payload then
if redis.call("EXISTS", tmp) then
redis.call("RENAME", tmp, src)
end
return 0
end
local job_class = cjson.decode(payload)["class"]
local reject_job = false
for i = 1, #ARGV do
if job_class == ARGV[i] then
reject_job = true
end
end
if reject_job then
redis.call("RPUSH", tmp, payload)
return 1
end
redis.call("RPUSH", dst, payload)
return 2
LUA
def initialize(src:, dst:, reject:, observer: Observer.new)
@keys = [-src.to_s, -dst.to_s].freeze
@argv = reject.map { |it| -it.to_s }.freeze
@observer = observer
end
def call
Sidekiq.redis do |redis|
script = redis.script("LOAD", SCRIPT)
loop do
result = redis.evalsha(script, @keys, @argv)
case result
when 2 then @observer.call(:requeued)
when 1 then @observer.call(:rejected)
when 0
@observer.flush(force: true)
break
else
raise "unexpected script result: #{result.inspect}"
end
end
end
end
end
class << self
def self.call(queue, reject:)
orig_key = "queue:#{queue}"
temp_key = "#{orig_key}.#{SecureRandom.hex}"
# Move queue out of sidekiq polling
Sidekiq.redis { |redis| redis.rename(orig_key, temp_key) }
Drainer.new(src: temp_key, dst: orig_key, reject: reject).call
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment