Last active
August 29, 2015 14:07
-
-
Save schmurfy/d3ba8cdbc4c1f943cdb2 to your computer and use it in GitHub Desktop.
concurrent-ruby experimentations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'concurrent' | |
logger = Logger.new($stderr) # | |
# Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| | |
# logger.add level, message, progname, &block | |
# end # | |
class Master < Concurrent::Actor::RestartingContext | |
def initialize | |
p [:master, :created] | |
@listeners = [] | |
end | |
def on_message(msg_parts) | |
cmd, *args = *msg_parts | |
case cmd | |
when :subscribe | |
@listeners << envelope.sender | |
when :crash | |
raise "wtf" | |
when :data_received | |
@listeners.each{|actor| actor << msg_parts } | |
when :listeners | |
@listeners | |
else | |
pass | |
end | |
end | |
end | |
class Listener < Concurrent::Actor::RestartingContext | |
def initialize(master) | |
p [:listener, Concurrent::Actor.name, :started] | |
master << :link | |
master << :subscribe | |
end | |
def on_message(msg_parts) | |
p [:ss, msg_parts] | |
cmd, *args = *msg_parts | |
case cmd | |
when :data_received | |
p [:listener, Concurrent::Actor.name, :data, args[0]] | |
else | |
pass | |
end | |
end | |
end # | |
master = Master.spawn(name: 'master', supervise: true) | |
listener = Listener.spawn(name: 'listener1', supervise: true, args: [master]) | |
master << [:data_received, 42] | |
master << :crash | |
sleep 0.1 | |
master << [:data_received, 32] | |
master << :crash | |
master << [:data_received, 32] | |
sleep |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'concurrent' | |
class DataWriter < Concurrent::Actor::Context | |
def initialize(dispatcher) | |
dispatcher << :subscribe | |
end | |
def on_message(msg) | |
case msg[0] | |
when :msg | |
p [:yeah, msg[1]] | |
end | |
end | |
end | |
dispatcher = Concurrent::Actor::Utils::Broadcast.spawn! 'event-dispatcher' | |
writer1 = DataWriter.spawn(:one, dispatcher) | |
writer2 = DataWriter.spawn(:two, dispatcher) | |
dispatcher << [:msg, 'asd'] | |
sleep |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'concurrent' | |
class Crasher < Concurrent::Actor::RestartingContext | |
def initialize | |
p [:create] | |
end | |
def on_message(msg) | |
case msg | |
when :crash | |
raise "arrggggg" | |
when :print | |
p [:hello] | |
end | |
end | |
end | |
actor = Crasher.spawn!(:bob) | |
actor << :print | |
actor << :crash | |
sleep 1 | |
actor << :print | |
sleep |
To fix it you need to see:
- https://github.com/ruby-concurrency/concurrent-ruby/blob/master/lib/concurrent/actor/core.rb#L38
- https://github.com/ruby-concurrency/concurrent-ruby/blob/master/lib/concurrent/actor/behaviour/supervised.rb
then:
class Crasher < Concurrent::Actor::RestartingContext
def initialize
p [:create]
end
def on_message(msg)
case msg
when :crash
raise "arrggggg"
when :print
p [:hello]
end
end
end
actor = Crasher.spawn!(name: 'bob', supervise: true)
actor << :print
actor << :crash
sleep 0.1
actor << :print
should fix it
I've polished just a little bit:
class Master < Concurrent::Actor::RestartingContext
def initialize
p [path, :created]
@listeners = []
end
def on_message(msg_parts)
cmd, *args = *msg_parts
case cmd
when :subscribe
@listeners << envelope.sender
when :crash
raise "wtf"
when :broadcast
@listeners.each { |actor| actor << msg_parts }
when :listeners
@listeners
else
pass
end
end
end
class Listener < Concurrent::Actor::RestartingContext
def initialize(master)
p [path, :created]
master << :link << :subscribe
end
def on_message(msg_parts)
p [:listener_got, msg_parts]
cmd, *args = *msg_parts
case cmd
when :reset
# reset! when master does
behaviour!(Concurrent::Actor::Behaviour::Pausing).reset!
when StandardError
# pause! when master does
behaviour!(Concurrent::Actor::Behaviour::Pausing).pause!
when :broadcast
p [path, :data, args[0]]
else
pass
end
end
end #
master = Master.spawn(name: 'master', supervise: true)
# gets auto subscribed again on reset
listener = Listener.spawn(name: 'listener1', supervise: true, args: [master])
master << [:broadcast, 42] << :crash
sleep 0.1
master << [:broadcast, 32] << :crash << [:broadcast, 32]
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
does following work for you?