Created
April 3, 2017 08:19
-
-
Save okkez/2bde037331b28a935f3f13289910a49b to your computer and use it in GitHub Desktop.
Sample implementation using nonblocking IO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/twitter/streaming/client.rb b/lib/twitter/streaming/client.rb | |
index 4cdef8bd..47cd6cdd 100644 | |
--- a/lib/twitter/streaming/client.rb | |
+++ b/lib/twitter/streaming/client.rb | |
@@ -104,6 +104,10 @@ module Twitter | |
end | |
end | |
+ def close | |
+ @connection.close | |
+ end | |
+ | |
private | |
def request(method, uri, params) | |
diff --git a/lib/twitter/streaming/connection.rb b/lib/twitter/streaming/connection.rb | |
index 93b01889..ab6eca0b 100644 | |
--- a/lib/twitter/streaming/connection.rb | |
+++ b/lib/twitter/streaming/connection.rb | |
@@ -8,20 +8,35 @@ module Twitter | |
def initialize(opts = {}) | |
@tcp_socket_class = opts.fetch(:tcp_socket_class) { TCPSocket } | |
@ssl_socket_class = opts.fetch(:ssl_socket_class) { OpenSSL::SSL::SSLSocket } | |
+ @client = nil | |
+ @ssl_client = nil | |
end | |
attr_reader :tcp_socket_class, :ssl_socket_class | |
def stream(request, response) | |
client_context = OpenSSL::SSL::SSLContext.new | |
- client = @tcp_socket_class.new(Resolv.getaddress(request.socket_host), request.socket_port) | |
- ssl_client = @ssl_socket_class.new(client, client_context) | |
+ @client = @tcp_socket_class.new(Resolv.getaddress(request.socket_host), request.socket_port) | |
+ @ssl_client = @ssl_socket_class.new(@client, client_context) | |
- ssl_client.connect | |
- request.stream(ssl_client) | |
- while body = ssl_client.readpartial(1024) # rubocop:disable AssignmentInCondition | |
- response << body | |
+ @ssl_client.connect | |
+ request.stream(@ssl_client) | |
+ begin | |
+ return if @ssl_client.closed? | |
+ while body = @ssl_client.read_nonblock(1024) # rubocop:disable AssignmentInCondition | |
+ response << body | |
+ end | |
+ rescue IO::WaitReadable, IO::WaitWritable | |
+ sleep 0.1 | |
+ retry | |
+ rescue EOFError | |
+ return | |
end | |
end | |
+ | |
+ def close | |
+ @ssl_client.close | |
+ @client.close | |
+ end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment