Last active
December 10, 2015 12:28
-
-
Save PetrKaleta/4434069 to your computer and use it in GitHub Desktop.
Custom Typhoeus based QC Worker Note: I've never tested this code, its just an example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'queue_classic' | |
require 'typhoeus' | |
trap('INT') { exit } | |
trap('TERM') { worker.stop } | |
# Generic consumer | |
class TyphoeusConcurrentJob | |
def initialize(opts={}) | |
@opts = opts | |
end | |
def perform(response) | |
# make your own | |
end | |
def request | |
req = Typhoeus::Request.new @opts['req_url'], @opts['req_opts'] | |
# once request has completed, call consumer | |
req.on_complete do |response| | |
perform(response) | |
end | |
req | |
end | |
end | |
# Simple consumer class which is doing actual job with response | |
class Consumer < TyphoeusConcurrentJob | |
def perform(response) | |
puts response.body | |
puts @opts.inspect | |
end | |
end | |
# Enqueing new job | |
job_args = { | |
'req_url' => 'http://foo.bar', # request url | |
'req_opts' => {}, # aditional request options | |
'foo' => 'bar' # another options... | |
} | |
QC.enqueue('Consumer', job_args) | |
# Custom QC worker | |
class TyphoeusConcurrentWorker < QC::Worker | |
def initialize(args={}) | |
super args | |
@max_concurrency = args[:max_concurrency] | |
@hydra = Typhoeus::Hydra.new(:max_concurrency => @max_concurrency) | |
end | |
def start | |
while @running | |
@force_execution = false | |
# non-blocking operation, so fill the request queue | |
@max_concurrency.times do | |
work | |
# break the loop and execute hydra queue | |
break if @force_execution | |
end | |
# this is a blocking call that returns once all requests are complete | |
@hydra.run | |
end | |
end | |
def call(job) | |
obj = eval(job[:method]).new(job[:args]) | |
return unless obj.kind_of?(TyphoeusConcurrentJob) | |
# enqueue new request | |
@hydra.queue(obj.request) | |
end | |
def lock_job | |
attempts = 0 | |
job = nil | |
# idea is to disable waiting for next job if there are some | |
# requests already waiting in hydra queue | |
# so in other words, hydra queue must be executed immediately | |
should_wait = @hydra.queued_requests.size == 0 | |
until !@running || job | |
job = @queue.lock(@top_bound) | |
if job.nil? && should_wait && attempts < @max_attempts | |
wait(2**attempts) | |
attempts += 1 | |
next | |
else | |
break | |
end | |
end | |
# there are some requests in hydra queue but no other job | |
# is waiting in QC queue | |
@force_execution = !should_wait && job.nil? | |
job | |
end | |
end | |
# Init worker | |
worker = TyphoeusConcurrentWorker.new(:max_attempts => 10, | |
:max_concurrency => 10, | |
:q_name => 'typhoeus', | |
:listening_worker => true) | |
# Start worker | |
worker.start |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The only question is, if job should be removed from QC queue once its request is queued in hydra queue or once Consumer is performed.