Skip to content

Instantly share code, notes, and snippets.

@DAddYE
Created January 3, 2013 12:49
Show Gist options
  • Save DAddYE/4443231 to your computer and use it in GitHub Desktop.
Save DAddYE/4443231 to your computer and use it in GitHub Desktop.
module Async
class Queue
attr_reader :worker, :concurrency
attr_accessor :saturated,
:drain
def initialize(concurrency, &block)
@worker = block
@workers = 0
@concurrency = concurrency
end
def tasks
@_tasks ||= []
end
def workers
@_workers ||= []
end
def push(data, &block)
tasks.push({
data: data,
callback: block
})
# A callback that is called when the queue length hits the concurrency
# and further tasks will be queued
if saturated && tasks.size == concurrency
saturated.call
end
# Process worker
process!
end
def process!
return if workers.size >= concurrency || tasks.size == 0
task = tasks.shift
data = task[:data]
Thread.abort_on_exception=true
workers << Thread.new do
begin
worker[data]
ensure
task[:callback][data] if task[:callback]
workers.delete(Thread.current)
drain.call if drain && tasks.size + workers.size == 0
process!
end
end
end
def on(event, &block)
raise ArgumentError, "event #{event} is not valid!" unless %w[saturated drain].include?(event.to_s)
send("#{event}=", block)
end
def wait
workers.each(&:join) while running?
end
alias :end :wait
def running?
tasks.size > 0 or workers.any?(&:alive?)
end
end
end
if $0 == __FILE__
@_todo = 100
@_done = 0
def upload_this_long_shit(item)
sleep 1
@_done += 1
end
queue = Async::Queue.new(10) { |item| upload_this_long_shit(item) }
queue.on(:saturated) { puts "Concurrency exceeded, next job will be queued" }
queue.on(:drain) { puts "\rProcessed #@_done/#@_todo (gooooooooood!!!)" }
# Simulate a long job ... 100 :D
@_todo.times do |item_to_process|
queue.push(item_to_process){ |d| "Here I can register a 'Completed' callback with #{d}" }
end
while queue.running?
print "\rProcessing #@_done/#@_todo"
sleep 1
end
# or instead I can simply do
# queue.end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment