Created
October 2, 2008 09:46
-
-
Save astro/14329 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# An attempt to create an efficient HTTP client using eventmachine, utilizing HTTP pipelining. I reconsidered and decided it would be more natural to hack this in Erlang. Please finish. | |
require 'yaml' | |
require 'eventmachine' | |
require 'dnsruby' | |
require 'uri' | |
require 'zlib' | |
Dnsruby::Resolver::use_eventmachine true | |
Dnsruby::Resolver::start_eventmachine_loop false | |
class GodObject | |
include Singleton | |
def self.method_missing m, *a | |
puts "Sending #{m}(#{a.inspect}) to #{inspect}" | |
if block_given? | |
b = lambda { |*c| yield *c } | |
instance.send m, *a, &b | |
else | |
instance.send m, *a | |
end | |
end | |
end | |
class DNSName | |
def self.parse_ip_address(ip) | |
begin | |
Dnsruby::IPv4::create(ip) | |
rescue ArgumentError | |
begin | |
Dnsruby::IPv6::create(ip) | |
rescue ArgumentError | |
nil | |
end | |
end | |
end | |
def initialize(dns, name) | |
if address = DNSName.parse_ip_address(name) | |
@result = [:succeed, [address.to_s]] | |
else | |
df = dns.send_async(Dnsruby::Message.new(name)) | |
df.callback &method(:on_success) | |
df.errback &method(:on_fail) | |
@result = nil | |
end | |
@deferrables = [] | |
end | |
def defer | |
d = EM::DefaultDeferrable.new | |
if @result | |
apply_result_to d | |
else | |
@deferrables << d | |
end | |
d | |
end | |
private | |
def apply_result_to(d) | |
d.send *@result | |
d.send *@result | |
end | |
def apply_result_to_all | |
@deferrables.each { |d| | |
apply_result_to d | |
} | |
@deferrables = [] | |
end | |
def on_success(msg) | |
addresses = msg.answer.select { |a| | |
a.kind_of?(Dnsruby::RR::IN::A) || | |
a.kind_of?(Dnsruby::RR::IN::AAAA) | |
}.map { |a| a.address.to_s } | |
@result = [:succeed, addresses] | |
apply_result_to_all | |
end | |
def on_fail(msg, err) | |
@result = [:fail, err] | |
apply_result_to_all | |
end | |
end | |
class DNSCache < GodObject | |
def initialize | |
@dns = Dnsruby::Resolver.new | |
@queries = {} | |
end | |
def resolve(name) | |
q = if @queries.has_key? name | |
@queries[name] | |
else | |
@queries[name] = DNSName.new(@dns, name) | |
end | |
q.defer | |
end | |
end | |
# TODO: SSL, connection reviving when was idle | |
class HTTPConnection | |
module LineConnection | |
attr_accessor :handler | |
attr_accessor :mode | |
attr_accessor :packet_length | |
def connection_completed | |
@handler.opened! | |
@mode = :line | |
@line = '' | |
@packet_length = nil | |
end | |
def send_requests(requests) | |
send_data requests.to_s | |
puts "sent requests: #{requests.to_s.inspect}" | |
end | |
def receive_data(data) | |
while data.size > 0 | |
data = send "receive_#{@mode}", data | |
end | |
end | |
def unbind | |
@handler.handle_disconnected! | |
end | |
private | |
def receive_line(data) | |
l, data = data.split("\n", 2) | |
@line += l | |
if data | |
@handler.handle_line @line | |
@line = '' | |
data | |
else | |
'' | |
end | |
end | |
def receive_packet(data) | |
if @packet_length | |
chunk = (@packet_length > 0) ? data[0..(@packet_length - 1)] : '' | |
@handler.handle_packet_chunk data if chunk != '' | |
data = data[@packet_length..-1].to_s | |
@packet_length -= chunk.size | |
if @packet_length < 1 | |
@handler.handle_packet_end | |
data | |
else | |
'' | |
end | |
else | |
@handler.handle_chunk data | |
'' | |
end | |
end | |
end | |
def initialize(host, port) | |
@requests = [] | |
@host = host | |
@port = port | |
open_connection | |
@state = :status | |
@code, @status = nil, nil | |
@headers = {} | |
@content_decoder = nil | |
end | |
def open_connection | |
@opened = false | |
@c = EM.connect @host, @port, LineConnection | |
@c.handler = self | |
end | |
def opened! | |
@opened = true | |
may_send | |
end | |
def request(text, &block) | |
@requests << [text, block] | |
if @c | |
may_send | |
else | |
open_connection | |
end | |
end | |
def tell_requester(what, *msg) | |
if @requests.size < 1 | |
# Why is there more data? | |
@c.close_connection if @c | |
return | |
end | |
block = @requests.first[1] | |
block.call what, *msg | |
if what == :end | |
@requests.shift | |
@c.close_connection if @requests.size < 1 and @c | |
end | |
end | |
def handle_line(line) | |
line.strip! | |
case @state | |
when :status | |
http_ver, code, @status = line.split(' ', 3) | |
@code = code.to_i | |
@state = :headers | |
when :headers | |
if line != '' | |
k, v = line.split(': ', 2) | |
@headers[k] = v | |
else | |
# Headers finished | |
tell_requester :response, @code, @status, @headers | |
if @headers['Transfer-Encoding'] == 'chunked' | |
@chunked = true | |
@dumb = false | |
@state = :chunk_length | |
elsif (l = @headers['Content-Length']) | |
@chunked = false | |
@dumb = false | |
@c.mode = :packet | |
@c.packet_length = l.to_i | |
@state = :body | |
elsif (@code >= 100 && @code <= 199) || @code == 204 || @code == 304 | |
tell_requester :end | |
@state = :status | |
else | |
@chunked = false | |
@dumb = true | |
@c.mode = :packet | |
@c.packet_length = nil | |
@state = :body | |
end | |
@content_decoder = case @headers['Content-Encoding'] | |
when 'deflate' | |
z = Zlib::Inflate.new(nil) | |
z.method :inflate | |
when nil | |
else | |
raise "Unknown Content-Encoding: #{@headers['Content-Encoding']}" | |
end | |
end | |
when :chunk_length | |
if line != '' | |
@c.packet_length = line.to_i(16) | |
if @c.packet_length == 0 | |
tell_requester :end | |
@state = :chunk_trailer | |
else | |
@c.mode = :packet | |
@state = :body | |
end | |
end | |
when :chunk_trailer | |
if line == '' | |
@state = :status | |
end | |
end | |
end | |
def handle_packet_chunk(data) | |
if @content_decoder | |
puts "*** DECOMPRESSING ***" | |
data = @content_decoder.call(data) | |
end | |
tell_requester :body, data | |
end | |
def handle_packet_end | |
if @content_decoder | |
data = @content_decoder.call(nil) | |
tell_requester :body, data | |
end | |
@c.mode = :line | |
if @chunked | |
@state = :chunk_length | |
else | |
@state = :headers | |
tell_requester :end | |
end | |
end | |
def handle_disconnected! | |
@opened = false | |
@c = nil | |
if @dumb | |
tell_requester :end | |
end | |
if @requests.size > 0 | |
open_connection | |
end | |
end | |
private | |
def may_send | |
if @opened | |
@c.send_requests(@requests.map { |r| r[0] }) | |
end | |
end | |
end | |
class ConnectionPool < GodObject | |
def initialize | |
@connections = {} | |
end | |
def request(scheme, host, port, text, &block) | |
target = [scheme, host, port] | |
c = if @connections.has_key? target | |
@connections[target] | |
else | |
@connections[target] = new_connection(*target) | |
end | |
c.request(text, &block) | |
end | |
private | |
def new_connection(scheme, host, port) | |
case scheme | |
when 'http' then HTTPConnection | |
else raise "Unsupported URL scheme: #{scheme}" | |
end.new(host, port) | |
end | |
end | |
class Transfer | |
def initialize(url) | |
@can_go = false | |
@has_addresses = false | |
@error = nil | |
@uri = URI::parse(url) | |
d = DNSCache.resolve(@uri.host) | |
d.callback { |addresses| | |
puts "dns for #{@uri.host}: #{addresses.inspect}" | |
@addresses = addresses | |
@has_addresses = true | |
may_go | |
} | |
d.errback { |err| | |
puts "dns for #{@uri.host}: #{err}" | |
@error = err.to_s | |
@has_addresses = true | |
may_go | |
} | |
@receivers = [] | |
end | |
def get(spawnable) | |
@receivers << spawnable | |
end | |
def go! | |
@can_go = true | |
may_go | |
end | |
private | |
def notify_receivers(*msg) | |
@receivers.each { |r| r.notify *msg } | |
end | |
def may_go | |
if @can_go && @has_addresses | |
if @error | |
notify_receivers :error, @error | |
else | |
# TODO: RR-addresses | |
request_headers = { | |
'Host' => @uri.host, | |
'Connection' => 'Keep-Alive', | |
'Accept-Encoding' => 'chunked, deflate, identity'} | |
ConnectionPool.request(@uri.scheme, @addresses[0], @uri.port, | |
"GET #{@uri.request_uri} HTTP/1.1\r\n" + | |
request_headers.map { |k,v| | |
"#{k}: #{v}\r\n" | |
}.to_s + | |
"\r\n") { |*msg| | |
notify_receivers *msg | |
} | |
end | |
end | |
end | |
end | |
class TransferManager < GodObject | |
def initialize | |
@transfers = {} | |
end | |
def get(url, spawnable) | |
t = if @transfers.has_key? url | |
@transfers[url] | |
else | |
@transfers[url] = Transfer.new(url) | |
end | |
t.get spawnable | |
end | |
## | |
# Call this after everybody has made his get request, so nobody gets | |
# chunks starts at the half of the stream just because he has | |
# requested too late and the network was too fast. | |
# | |
# TODO: this can be solved more elegantly | |
def go! | |
@transfers.each { |url,t| | |
t.go! | |
} | |
end | |
end | |
EM.run do | |
reader = EM.spawn { |w,*m| | |
case w | |
when :body | |
b, = m | |
puts "reader: #{b[0..10].inspect} (#{b.size})" | |
else | |
puts "reader: #{w} #{m.inspect}" | |
end | |
} | |
#=begin | |
YAML::load_file('config.yaml')['collections'].each { |cat,urls| | |
urls.each { |url| | |
TransferManager.get(url, reader) | |
} | |
} | |
#=end | |
TransferManager.get("http://api.flickr.com/services/feeds/photos_public.gne?id=73915810@N00&format=atom", reader) | |
TransferManager.get("http://api.flickr.com/services/feeds/photos_public.gne?id=58728439@N00&format=atom", reader) | |
TransferManager.go! | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment