Created
October 9, 2009 08:46
-
-
Save voloko/205871 to your computer and use it in GitHub Desktop.
Twitter realtime API client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'eventmachine' | |
require 'em/buftok' | |
require 'base64' | |
module Twitter | |
class JSONStream < EventMachine::Connection | |
MAX_LINE_LENGTH = 16*1024 | |
# network failure reconnections | |
NF_RECONNECT_START = 0.25 | |
NF_RECONNECT_ADD = 0.25 | |
NF_RECONNECT_MAX = 16 | |
# app failure reconnections | |
AF_RECONNECT_START = 10 | |
AF_RECONNECT_MUL = 2 | |
RECONNECT_MAX = 320 | |
RETRIES_MAX = 10 | |
DEFAULT_OPTIONS = { | |
:method => 'GET', | |
:path => '/', | |
:content_type => "application/x-www-form-urlencoded", | |
:content => '', | |
:path => '/1/statuses/filter.json', | |
:host => 'stream.twitter.com', | |
:port => 80, | |
:auth => 'test:test' | |
} | |
attr_accessor :code | |
attr_accessor :headers | |
attr_accessor :nf_last_reconnect | |
attr_accessor :af_last_reconnect | |
attr_accessor :reconnect_retries | |
def self.connect options = {} | |
options = DEFAULT_OPTIONS.merge(options) | |
EventMachine.connect options[:host], options[:port], self, options | |
end | |
def initialize options = {} | |
@options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly | |
@gracefully_closed = false | |
@nf_last_reconnect = nil | |
@af_last_reconnect = nil | |
@reconnect_retries = 0 | |
end | |
def each_item &block | |
@each_item_callback = block | |
end | |
def on_error &block | |
@error_callback = block | |
end | |
def on_reconnect &block | |
@reconnect_callback = block | |
end | |
def stop | |
@gracefully_closed = true | |
close_connection | |
end | |
def unbind | |
receive_line(@buffer.flush) unless @buffer.empty? | |
schedule_reconnect unless @gracefully_closed | |
end | |
def receive_data data | |
begin | |
@buffer.extract(data).each do |line| | |
receive_line(line) | |
end | |
rescue Exception => e | |
receive_error(e.message) | |
close_connection | |
return | |
end | |
end | |
def post_init | |
reset_state | |
send_request | |
end | |
protected | |
def schedule_reconnect | |
timeout = reconnect_timeout | |
@reconnect_retries += 1 | |
reconnect_after(timeout) if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX | |
end | |
def reconnect_after timeout | |
@reconnect_callback.call(timeout) if @reconnect_callback | |
EventMachine.add_timer(timeout) do | |
reconnect @options[:host], @options[:port] | |
end | |
end | |
def reconnect_timeout | |
if (@code == 0) # network failure | |
if @nf_last_reconnect | |
@nf_last_reconnect += NF_RECONNECT_ADD | |
else | |
@nf_last_reconnect = NF_RECONNECT_START | |
end | |
[@nf_last_reconnect,NF_RECONNECT_MAX].min | |
else | |
if @af_last_reconnect | |
@af_last_reconnect *= AF_RECONNECT_MUL | |
else | |
@af_last_reconnect = AF_RECONNECT_START | |
end | |
@af_last_reconnect | |
end | |
end | |
def reset_state | |
@code = 0 | |
@headers = [] | |
@state = :init | |
@buffer = BufferedTokenizer.new("\r", MAX_LINE_LENGTH) | |
end | |
def send_request | |
data = [] | |
data << "#{@options[:method]} #{@options[:path]} HTTP/1.1" | |
data << "Host: #{@options[:host]}" | |
data << "User-agent: ruby image client" | |
data << "Authorization: Basic " + [@options[:auth]].pack('m').delete("\r\n") | |
if @options[:method] == 'POST' | |
data << "Content-type: #{@options[:content_type]}" | |
data << "Content-length: #{@options[:content].length}" | |
end | |
data << "\r\n" | |
send_data data.join("\r\n") + @options[:content] | |
end | |
def receive_line ln | |
case @state | |
when :init | |
parse_response_line ln | |
when :headers | |
parse_header_line ln | |
when :stream | |
parse_stream_line ln | |
end | |
end | |
def receive_error e | |
@error_callback.call(e) if @error_callback | |
end | |
def parse_stream_line ln | |
ln.strip! | |
unless ln.empty? | |
if ln[0,1] == '{' | |
@each_item_callback.call(ln) if @each_item_callback | |
end | |
end | |
end | |
def parse_header_line ln | |
ln.strip! | |
if ln.empty? | |
reset_timeouts | |
@state = :stream | |
else | |
headers << ln | |
end | |
end | |
def parse_response_line ln | |
if ln =~ /\AHTTP\/1\.[01] ([\d]{3})/ | |
@code = $1.to_i | |
@state = :headers | |
else | |
receive_error('invalid response') | |
close_connection | |
end | |
end | |
def reset_timeouts | |
@nf_last_reconnect = @af_last_reconnect = nil | |
@reconnect_retries = 0 | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Moved JSONStream to a separate project. | |
# See: http://github.com/voloko/twitter-stream | |
# or simply sudo gem install twitter-stream -s http://gemcutter.org | |
require 'rubygems' | |
require 'twitter/json_stream' | |
EventMachine::run { | |
stream = Twitter::JSONStream.connect( | |
:path => '/1/statuses/filter.json', | |
:auth => 'LOGIN:PASSWORD', | |
:method => 'POST', | |
:content => 'track=something' | |
) | |
stream.each_item do |item| | |
# do something useful here (like posting to a message queue or saving to a db) | |
print 'found: ' + item | |
end | |
stream.on_error do |message| | |
# for logging and debugging purposes | |
# you might want to examine stream.headers here | |
print 'error: ' + message | |
end | |
stream.on_reconnect do |timeout| | |
# timeout adheres to http://apiwiki.twitter.com/Streaming-API-Documentation#Connecting | |
print "reconnecting in: #{timeout} seconds" | |
end | |
trap('TERM') { | |
stream.stop | |
EventMachine.stop if EventMachine.reactor_running? | |
} | |
} | |
puts "The event loop has ended" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment