-
-
Save godfat/4266074 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rest-core' | |
class BlockingClient | |
def initialize *args | |
@client = RC::Universal.new(*args) | |
end | |
def get path, params={}, opts={} | |
@client.get(path, params, opts).tap{} | |
end | |
end | |
class CallbackClient | |
def initialize *args | |
@client = RC::Universal.new(*args) | |
end | |
def get path, params={}, opts={} | |
@client.get(path, params, opts){ |res| | |
yield res | |
} | |
end | |
end | |
class CallbackFuture | |
def initialize | |
@fiber = Fiber.new{ | |
# The block of `yield` here should | |
# always resume this current fiber | |
# Ex: CallbackFuture.new{ |future_fiber| | |
# future_fiber.resume :abcd | |
# } | |
yield Fiber.current | |
# Block here and wait until reentrance... | |
res = Fiber.yield | |
# There are two possible scenarios and outcomes: | |
# 1. `tap_me` is called first, and we'll wait for | |
# ourselves being resumed. We then pass the result | |
# to `tap_me` | |
# 2. The block passed in is called first, and thus | |
# `res` is already the result. In this case, we'll | |
# wait for `tap_me` to be called later. | |
if res.is_a?(Fiber) | |
res.transfer Fiber.yield | |
else | |
Fiber.yield.transfer res | |
end | |
} | |
@fiber.resume | |
end | |
def tap_me | |
# As discussed in `initialize`, depending on `res`: | |
# 1. If `tap_me` is called later and we get `res` immediately, return it. | |
# 2. If `tap_me` is called first, we try again to get the result of the | |
# callback of `initialize` | |
res = @fiber.resume Fiber.current | |
if res | |
return res | |
else | |
Fiber.yield | |
end | |
end | |
end | |
class Future | |
def initialize | |
@thread = Thread.new{yield} | |
end | |
def tap_me | |
@thread.value | |
end | |
end | |
class FutureClient | |
def initialize *args | |
@client = if fiber_possible? | |
CallbackClient.new *args | |
else | |
BlockingClient.new *args | |
end | |
end | |
def get path, params={}, opts={} | |
if fiber_possible? | |
CallbackFuture.new{ |future_fiber| | |
# According to the design of CallbackFuture, | |
# the final action of this block should always be | |
# `future_fiber.transfer`. | |
# Also, since the callback {|resp| ...} is in the root fiber, | |
# we need to wrap everything in a new Fiber | |
# in order to `future_fiber.transfer` | |
@client.get(path, params, opts){ |resp| | |
Fiber.new{ | |
final_resp = if block_given? | |
yield resp | |
else | |
resp | |
end | |
future_fiber.transfer final_resp | |
}.resume | |
} | |
} | |
else | |
job = lambda{ @client.get(path, params, opts) } | |
if block_given? | |
Future.new{ yield(job.call) } | |
else | |
Future.new(&job) | |
end | |
end | |
end | |
def fiber_possible? | |
# We should also check that we aren't in the RootFiber here. | |
# However, since this is impossible now, comment it | |
EM.reactor_running? # and RootFiber != Fiber.current | |
end | |
end | |
def q str, m=nil | |
p = lambda{puts "\e[33m=> #{str.inspect}\e[0m"} | |
if m | |
m.synchronize(&p) | |
else | |
p.call | |
end | |
end | |
def github_use_case | |
c = FutureClient.new(json_response: true, | |
site: 'https://api.github.com', | |
timeout: 120, | |
log_method: lambda{|s| puts s}) | |
futures = [] | |
%w[rubytaiwan godfat fumin].each{ |user| | |
futures << c.get("/users/#{user}/repos", per_page: 100){ |repos| | |
rs = repos.reject{|r| r['fork']} | |
most_watched = rs.max_by{|r| r['watchers']}['name'] | |
most_size = rs.max_by{|r| r['size']}['name'] | |
watch_contri = c.get("/repos/#{user}/#{most_watched}/contributors") | |
size_contri = c.get("/repos/#{user}/#{most_size}/contributors") | |
most_watched_most_contri = watch_contri.tap_me.max_by{|c| c['contributions']} | |
most_size_most_contri = size_contri.tap_me.max_by{|c| c['contributions']} | |
q "Most contributed user for most watched: #{user}/#{most_watched}:" | |
q most_watched_most_contri['login'] | |
q "Most contributed user for most size: #{user}/#{most_size}:" | |
q most_size_most_contri['login'] | |
repos | |
} | |
} | |
futures.map{|f| f.tap_me} | |
end | |
def google_use_case | |
c = FutureClient.new(site: 'https://www.google.com', | |
log_method: lambda{|s| puts s}) | |
futures = [] | |
%w[rubytaiwan godfat].each{ |user| | |
futures << c.get("/search", q: user){ |repos| | |
watch_contri = c.get("/search", q: "#{user}r") | |
size_contri = c.get("/search", q: "#{user}t") | |
most_watched_most_contri = watch_contri.tap_me.to_s[0..10] | |
most_size_most_contri = size_contri.tap_me.to_s[0..10] | |
q "Most contributed user for most watched: #{user}:" | |
q most_watched_most_contri | |
q "Most contributed user for most size: #{user}:" | |
q most_size_most_contri | |
repos | |
} | |
} | |
futures.map{|f| f.tap_me} | |
end | |
class EM_USE; end | |
%w[github google].each do |g| | |
EM_USE.module_eval <<-RUBY | |
def em_#{g}_use_case | |
EM.run do | |
Fiber.new{ | |
resp = #{g}_use_case | |
puts resp.map(&:size) | |
EM.stop | |
}.resume | |
end | |
end | |
RUBY | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment