-
-
Save Paxa/3921812 to your computer and use it in GitHub Desktop.
Async request handling with sinatra and EM, freeze if request not exist URL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env rackup -Ilib:../lib -s thin | |
# async message handling | |
# using gem https://github.com/raggi/async_sinatra | |
require 'sinatra/async' | |
require "em-http-request" | |
require "em-synchrony" | |
require "em-synchrony/em-http" | |
require "em-synchrony/fiber_iterator" | |
require 'resolv' | |
require 'resolv-replace' #https://github.com/mperham/em-resolv-replace | |
module Handler | |
MAX_RETRIES = 5 | |
CONCURRENCY = 5 | |
class Job | |
attr_accessor :url, :retries, :run_after | |
def initialize(url) | |
@retries = 0 | |
@url = url | |
end | |
def runable? | |
!run_after || Time.now > run_after | |
end | |
def run_again! | |
@retries += 1 | |
@run_after = Time.now + self.retries # * 60 | |
end | |
end | |
extend self | |
@queue = [] | |
def <<(value) | |
puts "add to queue #{value}" | |
@queue << Job.new(value) | |
end | |
def get_jobs | |
jobs = @queue.select(&:runable?).first(CONCURRENCY) | |
jobs.each {|job| @queue.delete(job) } | |
jobs | |
end | |
# like in goliath | |
def latency_timer | |
@last_latency = Time.now.to_f | |
EM.add_periodic_timer(1) do | |
@recent_latency = ((Time.now.to_f - @last_latency) - 1) | |
puts "LATENCY: #{(@recent_latency * 1000)} ms" | |
@last_latency = Time.now.to_f | |
end | |
end | |
def run | |
return if @running | |
@running = true | |
latency_timer | |
EM.add_periodic_timer(1) do | |
concurrency = 5 | |
jobs = get_jobs | |
if jobs.any? | |
puts "remain #{@queue.size}" | |
EM.synchrony do | |
EM::Synchrony::FiberIterator.new(jobs, CONCURRENCY).each do |job| | |
resp = EventMachine::HttpRequest.new(job.url).get(:redirects => 10) | |
if resp.response_header.status == 200 | |
puts "Message delivered" | |
else | |
puts "Error when get #{job.url} status #{resp.response_header.status}" | |
if job.retries < MAX_RETRIES | |
job.run_again! | |
puts "Requesting #{job.url} will run again at #{job.run_after}" | |
@queue.push(job) #потом запросим повторно | |
end | |
end | |
puts "request complete #{resp.response_header.status}" | |
end | |
end | |
end | |
end | |
puts "Handler started" | |
end | |
end | |
class AsyncTest < Sinatra::Base | |
register Sinatra::Async | |
enable :show_exceptions | |
before do | |
Handler.run | |
end | |
aget '/send' do | |
msg = params[:http] | |
body "Added to queue: #{msg}" | |
Handler << msg | |
end | |
end | |
run AsyncTest.new |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment