Last active
December 10, 2015 15:08
-
-
Save PetrKaleta/4452196 to your computer and use it in GitHub Desktop.
This code demonstrates how you can create queue_classic worker which will do concurrent requests to 3rd party APIs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is demonstrating a default queue_classic worker. | |
# The biggest issue is, that requests made by its jobs are blocking | |
require 'rubygems' | |
require 'queue_classic' | |
require 'typhoeus' | |
trap('INT') { exit } | |
trap('TERM') { worker.stop } | |
class Consumer | |
@@finished_jobs_count = 0 | |
def self.perform(opts={}) | |
req_opts = opts['request'] | |
response = Typhoeus::Request.get req_opts['url'], | |
:headers => req_opts['headers'], | |
:params => req_opts['params'], | |
:body => req_opts['body'] | |
#puts "HTTP#{response.code}" | |
# just for metrics purposes | |
# remove this code once you don't need it | |
@@finished_jobs_count += 1 | |
if @@finished_jobs_count == JOBS_COUNT | |
total_time = ((Time.now.to_f - START_TIME) * 1000.0).to_i | |
puts "#{JOBS_COUNT} jobs finished in #{total_time}ms" | |
end | |
# end of metrics code | |
end | |
end | |
# Test case | |
JOBS_COUNT = 100 | |
JOBS_COUNT.times do |i| | |
job_args = { | |
'request' => { | |
'url' => 'http://search.twitter.com/search.json', | |
'method' => 'get', | |
'params' => { | |
'q' => i, | |
'rpp' => 15 | |
} | |
} | |
} | |
QC.enqueue('Consumer.perform', job_args) | |
end | |
# Init worker | |
worker = QC::Worker.new(:max_attempts => 10, | |
:listening_worker => true) | |
# Start worker | |
START_TIME = Time.now.to_f | |
worker.start | |
# I've made 3 tests: | |
# 100 jobs finished in 56044ms | |
# 100 jobs finished in 53604ms | |
# 100 jobs finished in 52626ms |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is my custom worker which can do concurrent requests | |
require 'rubygems' | |
require 'queue_classic' | |
require 'typhoeus' | |
trap('INT') { exit } | |
trap('TERM') { worker.stop } | |
# Generic consumer | |
# Subclass it to create your own job | |
class TyphoeusConcurrentJob | |
# To enqueue subclass of this job just use: | |
# | |
# QC.enqueue('MySubclass.perform', '{"req_url" => "http://foo.bar", "req_opts" => {}}') | |
# | |
# # or you without method specified, which will automatically call perform method | |
# QC.enqueue('MySubclass', '{"req_url" => "http://foo.bar", "req_opts" => {}}') | |
# | |
def self.perform(response, opts={}) | |
# subclass it and define your own | |
end | |
end | |
# Simple consumer class which is doing actual job with response | |
class Consumer < TyphoeusConcurrentJob | |
def self.perform(response, opts={}) | |
#puts "HTTP#{response.code}" | |
end | |
end | |
# Custom QC worker | |
class TyphoeusConcurrentWorker < QC::Worker | |
def initialize(args={}) | |
super args | |
@max_concurrency = args[:max_concurrency] | |
@hydra = Typhoeus::Hydra.new(:max_concurrency => @max_concurrency) | |
end | |
def start | |
# just for metrics | |
# remove this code once you don't need it | |
@start_time = Time.now.to_f | |
@finished_jobs_count = 0 | |
# end of metrics code | |
while @running | |
@force_execution = false | |
# non-blocking operation, so fill the request queue | |
@max_concurrency.times do | |
work | |
# break the loop and execute hydra queue | |
break if @force_execution | |
end | |
# this is a blocking call that returns once all requests are complete | |
@hydra.run | |
end | |
end | |
def work | |
if job = lock_job | |
begin | |
call(job) | |
rescue => e | |
handle_failure(job, e) | |
end | |
end | |
end | |
def call(job) | |
spec = job[:method].split('.') | |
klass = eval(spec.first) | |
# use specified message or use default one if no specified | |
message = spec.size > 1 ? spec.last : 'perform' | |
args = job[:args][0] | |
raise('Invalid job! TyphoeusConcurrentJob subclass expected!') unless klass < TyphoeusConcurrentJob | |
req_opts = args['request'] | |
request = Typhoeus::Request.new req_opts['url'], | |
:method => req_opts['method'].to_sym, | |
:headers => req_opts['headers'], | |
:params => req_opts['params'], | |
:body => req_opts['body'] | |
# async callback | |
# thats why its resceued once again | |
# job is deleted once everything is done | |
request.on_complete do |response| | |
begin | |
klass.send(message, response, args) | |
rescue => e | |
handle_failure(job, e) | |
ensure | |
@queue.delete(job[:id]) | |
# just for metrics purposes | |
# remove this code once you don't need it | |
@finished_jobs_count += 1 | |
if @finished_jobs_count == JOBS_COUNT | |
total_time = ((Time.now.to_f - @start_time) * 1000.0).to_i | |
puts "#{JOBS_COUNT} jobs finished in #{total_time}ms with concurrency set to #{@max_concurrency}" | |
end | |
# end of metrics code | |
end | |
end | |
# enqueue new request | |
@hydra.queue(request) | |
end | |
def lock_job | |
attempts = 0 | |
job = nil | |
# idea is to disable waiting for next job if there are some | |
# requests already waiting in hydra queue | |
# so in other words, hydra queue must be executed immediately | |
should_wait = @hydra.queued_requests.size == 0 | |
until !@running || job | |
job = @queue.lock(@top_bound) | |
if job.nil? && should_wait && attempts < @max_attempts | |
wait(2**attempts) | |
attempts += 1 | |
next | |
else | |
break | |
end | |
end | |
# there are some requests in hydra queue but no other job | |
# is waiting in QC queue | |
@force_execution = !should_wait && job.nil? | |
job | |
end | |
def handle_failure(job,e) | |
puts e | |
end | |
end | |
# Test case | |
MAX_CONCURRENCY = 100 | |
JOBS_COUNT = 100 | |
JOBS_COUNT.times do |i| | |
job_args = { | |
'request' => { | |
'url' => 'http://search.twitter.com/search.json', | |
'method' => 'get', | |
'params' => { | |
'q' => i, | |
'rpp' => 15 | |
} | |
} | |
} | |
QC.enqueue('Consumer.perform', job_args) | |
end | |
# Init worker | |
worker = TyphoeusConcurrentWorker.new(:max_attempts => 10, | |
:max_concurrency => MAX_CONCURRENCY, | |
:listening_worker => true) | |
# Start worker | |
worker.start | |
# I've made 3 tests which concurrencies set to 10, 50 and 100: | |
# 100 jobs finished in 10304ms with concurrency set to 10 | |
# 100 jobs finished in 13701ms with concurrency set to 10 | |
# 100 jobs finished in 11708ms with concurrency set to 10 | |
# 100 jobs finished in 5408ms with concurrency set to 50 | |
# 100 jobs finished in 5370ms with concurrency set to 50 | |
# 100 jobs finished in 5155ms with concurrency set to 50 | |
# 100 jobs finished in 3767ms with concurrency set to 100 | |
# 100 jobs finished in 3940ms with concurrency set to 100 | |
# 100 jobs finished in 3708ms with concurrency set to 100 | |
# as you can see, the difference is HUGE! Enjoy! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment