Created
August 31, 2017 07:06
-
-
Save suhanlee/a504f0ffed21822e3f36124aad5bc4d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'json' | |
require 'fiber' | |
require 'eventmachine' | |
require 'em-http-request' | |
module Async | |
class Request | |
attr_accessor :url, :body | |
def initialize(method, url, body, query) | |
@method = method | |
@url = url | |
@body = body | |
@query = query | |
end | |
def async_fetch | |
f = Fiber.current | |
http = EventMachine::HttpRequest.new(@url).send(@method, :head => {'Content-Type' => 'application/json'}, | |
:query => @query, | |
:redirects => 1, | |
:body => @body.to_json) | |
http.callback { | |
f.resume(http) | |
} | |
http.errback { | |
f.resume(http) | |
} | |
return Fiber.yield | |
end | |
end | |
class Firebase | |
def initialize(fb_host, fb_key, aio) | |
@aio = aio | |
@fb_host = fb_host | |
@fb_key = fb_key | |
end | |
def update(url, data, query={}) | |
url = "#{@fb_host}#{url}" | |
auth = { | |
:auth => @fb_key | |
} | |
@aio.push(Request.new(:patch, url, data, query.merge(auth))) | |
end | |
end | |
class IO | |
require 'singleton' | |
include Singleton | |
attr_accessor :queue, :dead_queue | |
def initialize | |
@queue = Queue.new | |
@dead_queue = Queue.new | |
@stop_flag = false | |
run | |
end | |
def run | |
@th = Thread.new do | |
EM.run do | |
puts "EM.run..." | |
@normal_queue_count = 0 | |
@dead_queue_count = 0 | |
EM.add_periodic_timer(0) do | |
Fiber.new do | |
begin | |
request = queue.pop(true) | |
_request = request.dup | |
result = request.async_fetch | |
status = result.response_header.status.to_s | |
# 응답이 정상이 아닌 경우 dead_queue 에 추가 | |
if !status.start_with?("2") | |
dead_queue.push(_request) | |
end | |
@normal_queue_count += 1 | |
puts "[processing] normal_queue_count: #{@normal_queue_count}" | |
rescue Exception => e | |
# puts e | |
end | |
end.resume | |
begin | |
Fiber.new do | |
# 콜이 실패한 경우 순차적으로 1초당, 1번씩 재 실행 | |
if dead_queue.size > 0 | |
request = dead_queue.pop(true) | |
result = request.async_fetch | |
@dead_queue_count += 1 | |
puts "[processing] dead_queue_count: #{@dead_queue_count}" | |
end | |
end.resume | |
rescue Exception => e | |
end | |
end | |
end | |
puts "eventmachin is...end" | |
EventMachine.stop | |
end | |
end | |
def stop | |
@stop_flag = true | |
end | |
def push(request) | |
@queue.push(request) | |
end | |
def pop | |
@queue.pop | |
end | |
def join | |
@th.join | |
end | |
def is_success?(status) | |
# HTTP STATUS CODE == 2XX | |
status.to_s.start_With?("2") | |
end | |
end | |
end | |
aio = Async::IO.instance |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment