Skip to content

Instantly share code, notes, and snippets.

@chatgris
Created October 18, 2012 15:02
Show Gist options
  • Save chatgris/3912406 to your computer and use it in GitHub Desktop.
Save chatgris/3912406 to your computer and use it in GitHub Desktop.
Stack powered by celluloid and zmq
require 'celluloid/zmq'
Celluloid::ZMQ.init
STACK = {
:one => 'tcp://127.0.0.1:3436',
:two => 'tcp://127.0.0.1:3437',
:three => 'tcp://127.0.0.1:3438',
:four => 'tcp://127.0.0.1:3439',
:five => 'tcp://127.0.0.1:3440',
:six => 'tcp://127.0.0.1:3441',
:seven => 'tcp://127.0.0.1:3442',
:eight => 'tcp://127.0.0.1:3443',
:nine => 'tcp://127.0.0.1:3444',
:ten => 'tcp://127.0.0.1:3445'
}
class Client
include Celluloid::ZMQ
attr_reader :step
def initialize(address, step)
@step = step
@push = PushSocket.new
begin
@push.connect(address)
rescue IOError
@push.close
raise
end
@pull = PullSocket.new
begin
@pull.connect(STACK[step])
rescue IOError
@pull.close
raise
end
end
def run
loop { async.handle_message @pull.read}
end
def handle_message(message)
message = message.to_i + 1
@push.write @step.to_s, message.to_s
end
end
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :one).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :ten).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :two).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :three).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :four).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :five).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :six).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :seven).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :eight).run!
end
sleep
require_relative './client'
addr = 'tcp://127.0.0.1:3435'
5.times do
Client.new(addr, :nine).run!
end
sleep
require 'celluloid/zmq'
require 'benchmark'
Celluloid::ZMQ.init
class Step
include Celluloid::ZMQ
def initialize(step)
@step = step
@push = PushSocket.new
begin
@push.bind(STACK[step])
rescue IOError
@push.close
raise
end
end
def push(payload)
@push.write payload.to_s
end
end
class DStack
include Celluloid::ZMQ
attr_reader :location, :push, :pull
def initialize(address)
@location = address
@pull = PullSocket.new
@processed = 0
begin
@pull.bind(address)
rescue IOError
@pull.close
raise
end
STACK.each do |step, addr|
Step.supervise_as step, step
end
end
def run(stack)
@stack = stack
loop do
message = [Celluloid::Actor.current.pull.read]
while @pull.more_parts? do
message << Celluloid::Actor.current.pull.read
end
Celluloid::Actor.current.async.handle_message message
end
end
def process(payload)
Celluloid::Actor[@stack.first].push payload
end
def handle_message(message)
step, payload = message
if step
next_step = @stack.at(@stack.find_index(step.to_sym) + 1)
if next_step
Celluloid::Actor[next_step].async.push payload
else
@processed +=1
p @processed
p "Full of win" if @processed == 20000
end
end
end
end
addr = 'tcp://127.0.0.1:3435'
STACK = {
:one => 'tcp://127.0.0.1:3436',
:two => 'tcp://127.0.0.1:3437',
:three => 'tcp://127.0.0.1:3438',
:four => 'tcp://127.0.0.1:3439',
:five => 'tcp://127.0.0.1:3440',
:six => 'tcp://127.0.0.1:3441',
:seven => 'tcp://127.0.0.1:3442',
:eight => 'tcp://127.0.0.1:3443',
:nine => 'tcp://127.0.0.1:3444',
:ten => 'tcp://127.0.0.1:3445'
}
payload = 0
server = DStack.new(addr)
server.async.run(STACK.keys)
20000.times do
server.process(payload)
end
sleep
# jruby 1.6.8: ruby d_stack.rb 301,47s user 107,00s system 135% cpu 5:00,87 total
# mri 1.9.3 ruby d_stack.rb 128,23s user 31,37s system 88% cpu 2:59,76 total
source :rubygems
gem 'celluloid-zmq', github: "chatgris/celluloid-zmq", branch: "more_parts"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment