Created
October 18, 2012 15:02
-
-
Save chatgris/3912406 to your computer and use it in GitHub Desktop.
Stack powered by celluloid and zmq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid/zmq' | |
Celluloid::ZMQ.init | |
STACK = { | |
:one => 'tcp://127.0.0.1:3436', | |
:two => 'tcp://127.0.0.1:3437', | |
:three => 'tcp://127.0.0.1:3438', | |
:four => 'tcp://127.0.0.1:3439', | |
:five => 'tcp://127.0.0.1:3440', | |
:six => 'tcp://127.0.0.1:3441', | |
:seven => 'tcp://127.0.0.1:3442', | |
:eight => 'tcp://127.0.0.1:3443', | |
:nine => 'tcp://127.0.0.1:3444', | |
:ten => 'tcp://127.0.0.1:3445' | |
} | |
class Client | |
include Celluloid::ZMQ | |
attr_reader :step | |
def initialize(address, step) | |
@step = step | |
@push = PushSocket.new | |
begin | |
@push.connect(address) | |
rescue IOError | |
@push.close | |
raise | |
end | |
@pull = PullSocket.new | |
begin | |
@pull.connect(STACK[step]) | |
rescue IOError | |
@pull.close | |
raise | |
end | |
end | |
def run | |
loop { async.handle_message @pull.read} | |
end | |
def handle_message(message) | |
message = message.to_i + 1 | |
@push.write @step.to_s, message.to_s | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :one).run! | |
end | |
sleep |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :ten).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :two).run! | |
end | |
sleep |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :three).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :four).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :five).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :six).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :seven).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :eight).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './client' | |
addr = 'tcp://127.0.0.1:3435' | |
5.times do | |
Client.new(addr, :nine).run! | |
end | |
sleep | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid/zmq' | |
require 'benchmark' | |
Celluloid::ZMQ.init | |
class Step | |
include Celluloid::ZMQ | |
def initialize(step) | |
@step = step | |
@push = PushSocket.new | |
begin | |
@push.bind(STACK[step]) | |
rescue IOError | |
@push.close | |
raise | |
end | |
end | |
def push(payload) | |
@push.write payload.to_s | |
end | |
end | |
class DStack | |
include Celluloid::ZMQ | |
attr_reader :location, :push, :pull | |
def initialize(address) | |
@location = address | |
@pull = PullSocket.new | |
@processed = 0 | |
begin | |
@pull.bind(address) | |
rescue IOError | |
@pull.close | |
raise | |
end | |
STACK.each do |step, addr| | |
Step.supervise_as step, step | |
end | |
end | |
def run(stack) | |
@stack = stack | |
loop do | |
message = [Celluloid::Actor.current.pull.read] | |
while @pull.more_parts? do | |
message << Celluloid::Actor.current.pull.read | |
end | |
Celluloid::Actor.current.async.handle_message message | |
end | |
end | |
def process(payload) | |
Celluloid::Actor[@stack.first].push payload | |
end | |
def handle_message(message) | |
step, payload = message | |
if step | |
next_step = @stack.at(@stack.find_index(step.to_sym) + 1) | |
if next_step | |
Celluloid::Actor[next_step].async.push payload | |
else | |
@processed +=1 | |
p @processed | |
p "Full of win" if @processed == 20000 | |
end | |
end | |
end | |
end | |
addr = 'tcp://127.0.0.1:3435' | |
STACK = { | |
:one => 'tcp://127.0.0.1:3436', | |
:two => 'tcp://127.0.0.1:3437', | |
:three => 'tcp://127.0.0.1:3438', | |
:four => 'tcp://127.0.0.1:3439', | |
:five => 'tcp://127.0.0.1:3440', | |
:six => 'tcp://127.0.0.1:3441', | |
:seven => 'tcp://127.0.0.1:3442', | |
:eight => 'tcp://127.0.0.1:3443', | |
:nine => 'tcp://127.0.0.1:3444', | |
:ten => 'tcp://127.0.0.1:3445' | |
} | |
payload = 0 | |
server = DStack.new(addr) | |
server.async.run(STACK.keys) | |
20000.times do | |
server.process(payload) | |
end | |
sleep | |
# jruby 1.6.8: ruby d_stack.rb 301,47s user 107,00s system 135% cpu 5:00,87 total | |
# mri 1.9.3 ruby d_stack.rb 128,23s user 31,37s system 88% cpu 2:59,76 total |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source :rubygems | |
gem 'celluloid-zmq', github: "chatgris/celluloid-zmq", branch: "more_parts" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment