Last active
May 11, 2021 07:13
-
-
Save noteflakes/2beecc22eb34d2be6163567428e4aac8 to your computer and use it in GitHub Desktop.
A script showing how to use Polyphony to perform HTTP requests concurrently from a queue of jobs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# "I'm losing my mind on concurrency" | |
# https://www.reddit.com/r/ruby/comments/lmg8xo/im_losing_my_mind_on_concurrency/ | |
require 'polyphony' | |
require 'httparty' | |
require 'json' | |
CHANNELS = ['list', 'of', 'channels'] | |
JOB_QUEUE = Queue.new | |
TIMEOUT = 60 # 60 second timeout | |
CONCURRENCY = 5 | |
def handle_channel_response(response) | |
JSON.parse(response.body)['videos'].each do |v| | |
JOB_QUEUE << { kind: :video, url: v[:url] } | |
end | |
end | |
def handle_video_response(response) | |
# TODO: implement | |
end | |
def job_url(job) | |
case job[:kind] | |
when :channel | |
"https://mytube.com/channel/#{job[:name]}" | |
when :video | |
job[:url] | |
end | |
end | |
def perform(job) | |
# A timeout will raise a Polyphony::Cancel error | |
url = job_url(job) | |
response = cancel_after(TIMEOUT) { HTTParty.get(url) } | |
case job[:kind] | |
when :channel | |
handle_channel_response(response) | |
when :video | |
handle_video_response(response) | |
end | |
rescue Polyphony::Cancel | |
puts "Timeout while handling #{job.inspect}, requeuing..." | |
JOB_QUEUE << job | |
rescue => e | |
# decide what to do with error | |
if id_like_to_retry | |
JOB_QUEUE << job | |
else | |
puts "Error while handling #{job.inspect}: #{e.inspect}" | |
end | |
end | |
# Prepopulate job queue | |
CHANNELS.each do |c| | |
JOB_QUEUE << { kind: :channel, name: c } | |
end | |
# Setup workers | |
CONCURRENCY.times do | |
spin_loop do | |
break if job_queue.empty? | |
job = job_queue.shift | |
perform(job) | |
end | |
end | |
# wait for all workers to be done | |
Fiber.current.wait_for_all_children |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment