Skip to content

Instantly share code, notes, and snippets.

@suhanlee
Created August 31, 2017 07:06
Show Gist options
  • Save suhanlee/a504f0ffed21822e3f36124aad5bc4d0 to your computer and use it in GitHub Desktop.
Save suhanlee/a504f0ffed21822e3f36124aad5bc4d0 to your computer and use it in GitHub Desktop.
require 'thread'
require 'json'
require 'fiber'
require 'eventmachine'
require 'em-http-request'
module Async
class Request
attr_accessor :url, :body
def initialize(method, url, body, query)
@method = method
@url = url
@body = body
@query = query
end
def async_fetch
f = Fiber.current
http = EventMachine::HttpRequest.new(@url).send(@method, :head => {'Content-Type' => 'application/json'},
:query => @query,
:redirects => 1,
:body => @body.to_json)
http.callback {
f.resume(http)
}
http.errback {
f.resume(http)
}
return Fiber.yield
end
end
class Firebase
def initialize(fb_host, fb_key, aio)
@aio = aio
@fb_host = fb_host
@fb_key = fb_key
end
def update(url, data, query={})
url = "#{@fb_host}#{url}"
auth = {
:auth => @fb_key
}
@aio.push(Request.new(:patch, url, data, query.merge(auth)))
end
end
class IO
require 'singleton'
include Singleton
attr_accessor :queue, :dead_queue
def initialize
@queue = Queue.new
@dead_queue = Queue.new
@stop_flag = false
run
end
def run
@th = Thread.new do
EM.run do
puts "EM.run..."
@normal_queue_count = 0
@dead_queue_count = 0
EM.add_periodic_timer(0) do
Fiber.new do
begin
request = queue.pop(true)
_request = request.dup
result = request.async_fetch
status = result.response_header.status.to_s
# 응답이 정상이 아닌 경우 dead_queue 에 추가
if !status.start_with?("2")
dead_queue.push(_request)
end
@normal_queue_count += 1
puts "[processing] normal_queue_count: #{@normal_queue_count}"
rescue Exception => e
# puts e
end
end.resume
begin
Fiber.new do
# 콜이 실패한 경우 순차적으로 1초당, 1번씩 재 실행
if dead_queue.size > 0
request = dead_queue.pop(true)
result = request.async_fetch
@dead_queue_count += 1
puts "[processing] dead_queue_count: #{@dead_queue_count}"
end
end.resume
rescue Exception => e
end
end
end
puts "eventmachin is...end"
EventMachine.stop
end
end
def stop
@stop_flag = true
end
def push(request)
@queue.push(request)
end
def pop
@queue.pop
end
def join
@th.join
end
def is_success?(status)
# HTTP STATUS CODE == 2XX
status.to_s.start_With?("2")
end
end
end
aio = Async::IO.instance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment