Skip to content

Instantly share code, notes, and snippets.

@romiras
Last active January 25, 2022 07:40
Show Gist options
  • Save romiras/4a6f0c285649846960d164873919b74a to your computer and use it in GitHub Desktop.
Save romiras/4a6f0c285649846960d164873919b74a to your computer and use it in GitHub Desktop.
Concurrently classify items by type and batch them with multiple workers
# Concurrently classify items by type and batch them with multiple workers
require 'logger'
NUM_WORKERS = 10
NUM_ITEMS = 12
NUM_ITERATIONS = 5000
Item = Struct.new(:id, :type)
logger = Logger.new($stdout)
class BatchWorker
MAX_ITEMS = 2
def initialize
@queues = {}
end
def process(items)
items.each do |item|
push(item)
end
flush_queues
rescue RuntimeError => e # "can't add a new key into hash during iteration" is a RuntimeError
handle_error if e.message == "can't add a new key into hash during iteration"
raise # always reraise
end
def handle_error
# Credits to @thedarkone for original implementation
# lets see if we are able to pin down the culprit
# by collecting backtrace for all existing threads:
err_msg = ""
thread_count = 0
total_threads = Thread.list.size
Thread.list.each do |t|
thread_count += 1
object_id = t.object_id
msg = "--- EEE thread #{thread_count} of total #{total_threads} #{object_id} backtrace"
t_err_msg = "#{msg} begin\n"
t_err_msg << t.backtrace.join("\n")
t_err_msg << "\n#{msg} end\n"
err_msg << t_err_msg
end
puts(err_msg)
end
private
def push(item)
if @queues.key?(item.type)
ids = @queues[item.type]
else
ids = []
@queues[item.type] = ids
end
ids << item.id
return unless ids.length == MAX_ITEMS
flush_bulk(item.type, ids)
ids.clear
end
def flush_queues
@queues.each_pair do |type, ids|
unless ids.empty?
flush_bulk(type, ids)
ids.clear
end
end
end
def flush_bulk(type, ids)
p([type, ids])
end
end
def run(logger)
q = Queue.new
tt = []
tt << Thread.new do
NUM_ITERATIONS.times do
items = gen_items(NUM_ITEMS)
q.enq(items)
end
q.enq(nil)
q.close
end
NUM_WORKERS.times do |w|
tt << Thread.new do
label = "Worker #{w}"
wrk = BatchWorker.new
logger.info(label)
while has_jobs?(job = q.deq)
logger.info("#{label} picked job")
wrk.process(job)
logger.info("#{label} finished job")
end
logger.info("#{label} exited")
end
end
tt.each(&:join)
end
def has_jobs?(x)
!x.nil?
end
def gen_items(num)
ary = []
n = num.dup
r = (num * 2 / 3); n -= r
r.times { ary << Item.new(rand(20), 'A') }
r = (num * 2 / 9) + 1; n -= r
r.times { ary << Item.new(rand(20), 'B') }
n.times { ary << Item.new(rand(20), 'C') }
ary.shuffle
end
# gen_items(10).sort{|a, b| a.type <=> b.type}
run(logger)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment