Skip to content

Instantly share code, notes, and snippets.

@cronin101
Last active August 29, 2015 14:20
Show Gist options
  • Save cronin101/7579a89ff88e9ccf3f42 to your computer and use it in GitHub Desktop.
Save cronin101/7579a89ff88e9ccf3f42 to your computer and use it in GitHub Desktop.
Playing with transfer_encoding: chunked
require './data_source'
require './stream_observer'
require './streaming_app'
source = DataSource.new
app = StreamingApp.new
app.settings.set :stream_observer_factory,
-> type { StreamObserver.new(source, type) }
run app
require 'observer'
StreamItem = Struct.new(:type, :message)
class DataSource
include Observable
def initialize
@counter = 0
Thread.new do
loop do
increment_counter
sleep 1
end
end
end
private
def increment_counter
@counter += 1
changed
notify_observers(StreamItem.new(:counter, @counter))
end
end
require 'json'
class StreamObserver
include Enumerable
def initialize(source, *types_filter)
@source = source
@message_queue = Queue.new
@types_filter = types_filter
@source.add_observer(self, :receive_stream_item)
end
def receive_stream_item(item)
@message_queue.push item.to_h.to_json if @types_filter.include? item.type
end
def each
begin
loop { yield @message_queue.pop }
ensure
@source.delete_observer self
end
end
e
require 'sinatra/base'
class StreamingApp < Sinatra::Base
get '/counter' do
settings.stream_observer_factory(:counter).lazy.map { |count| "#{count} <br/>" }
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment