Skip to content

Instantly share code, notes, and snippets.

@MichaelXavier
Created November 15, 2012 03:41
Show Gist options
  • Select an option

  • Save MichaelXavier/4076507 to your computer and use it in GitHub Desktop.

Select an option

Save MichaelXavier/4076507 to your computer and use it in GitHub Desktop.
require 'celluloid'
require 'stringio'
class Producer
include Celluloid
include Celluloid::Notifications
attr_reader :io, :next_pool
def initialize(io, next_pool)
@io = io
@next_pool = next_pool
end
def run
while line = io.gets
next_pool.async.convert(line.chomp)
end
publish("producer_done")
puts "DONE WITH PRODUCER"
terminate
end
end
class Converter
include Celluloid
include Celluloid::Notifications
attr_reader :next_pool
def initialize(next_pool)
# will pool stop immediately or when it runs out of work?
subscribe("producer_done", :bye)
@next_pool = next_pool
end
def convert(node)
next_pool.async.process(node.to_s.upcase)
end
def bye
puts "CONVERTER UPSTREAM STOPPED"
publish("converter_stopped")
#I'm assuming this won't wait until my queue is clear
terminate
end
end
class Processor
include Celluloid
include Celluloid::Notifications
attr_reader :counter, :done
def initialize
@counter = 0
subscribe(:converter_stopped, :bye)
end
def process(converted)
@counter += 1
puts "##{counter}: #{converted}"
end
def bye
puts "PROCESSOR UPSTREAM STOPPED"
#I'm assuming this won't wait until my queue is clear
terminate
end
end
io = StringIO.new((1..100).to_a.map(&:to_s).join("\n"))
processor = Processor.new
converter_pool = Converter.pool(:args => [processor])
producer = Producer.new(io, converter_pool)
producer.async.run
# better way to await pipeline stop?
while processor.alive?
Thread.pass
end
puts "do cleanup here"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment