Created
August 10, 2016 14:45
-
-
Save prathmeshranaut/c12b3d1380eb38f3394ca12b1999ad16 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Celluloid | |
module Supervision | |
class Container | |
# Manages a fixed-size pool of actors | |
# Delegates work (i.e. methods) and supervises actors | |
# Don't use this class directly. Instead use MyKlass.pool | |
class Pool | |
include Celluloid | |
trap_exit :__crash_handler__ | |
finalizer :__shutdown__ | |
attr_reader :size, :actors | |
def initialize(options={}) | |
@idle = [] | |
@busy = [] | |
@klass = options[:actors] | |
@actors = Set.new | |
@mutex = Mutex.new | |
@size = options[:size] || [Celluloid.cores || 2, 2].max | |
@args = options[:args] ? Array(options[:args]) : [] | |
# Do this last since it can suspend and/or crash | |
@idle = @size.times.map { __spawn_actor__ } | |
end | |
def __shutdown__ | |
return unless defined?(@actors) && @actors | |
# TODO: these can be nil if initializer crashes | |
terminators = @actors.map do |actor| | |
begin | |
actor.future(:terminate) | |
rescue DeadActorError | |
end | |
end | |
terminators.compact.each { |terminator| terminator.value rescue nil } | |
end | |
def _send_(method, *args, &block) | |
actor = __provision_actor__ | |
begin | |
actor._send_ method, *args, &block | |
rescue DeadActorError # if we get a dead actor out of the pool | |
wait :respawn_complete | |
actor = __provision_actor__ | |
retry | |
rescue ::Exception => ex | |
abort ex | |
ensure | |
if actor.alive? | |
@idle << actor | |
@busy.delete actor | |
# Broadcast that actor is done processing and | |
# waiting idle | |
signal :actor_idle | |
end | |
end | |
end | |
def name | |
_send_ @mailbox, :name | |
end | |
def is_a?(klass) | |
_send_ :is_a?, klass | |
end | |
def kind_of?(klass) | |
_send_ :kind_of?, klass | |
end | |
def methods(include_ancestors = true) | |
_send_ :methods, include_ancestors | |
end | |
def to_s | |
_send_ :to_s | |
end | |
def inspect | |
_send_ :inspect | |
end | |
def size=(new_size) | |
new_size = [0, new_size].max | |
if new_size > size | |
delta = new_size - size | |
delta.times { @idle << __spawn_actor__ } | |
else | |
(size - new_size).times do | |
actor = __provision_actor__ | |
unlink actor | |
@busy.delete actor | |
@actors.delete actor | |
actor.terminate | |
end | |
end | |
@size = new_size | |
end | |
def busy_size | |
@mutex.synchronize { @busy.length } | |
end | |
def idle_size | |
@mutex.synchronize { @idle.length } | |
end | |
def __idle?(actor) | |
@mutex.synchronize { @idle.include? actor } | |
end | |
def __busy?(actor) | |
@mutex.synchronize { @busy.include? actor } | |
end | |
def __busy | |
@mutex.synchronize { @busy } | |
end | |
def __idle | |
@mutex.synchronize { @idle } | |
end | |
def __state(actor) | |
return :busy if __busy?(actor) | |
return :idle if __idle?(actor) | |
:missing | |
end | |
# Instantiate an actor, add it to the actor Set, and return it | |
def __spawn_actor__ | |
actor = @klass.new_link(*@args) | |
@mutex.synchronize { @actors.add(actor) } | |
@actors.add(actor) | |
actor | |
end | |
# Provision a new actor ( take it out of idle, move it into busy, and avail it ) | |
def __provision_actor__ | |
Task.current.guard_warnings = true | |
@mutex.synchronize do | |
while @idle.empty? | |
# Wait for responses from one of the busy actors | |
response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } } | |
Thread.current[:celluloid_actor].handle_message(response) | |
end | |
actor = @idle.shift | |
@busy << actor | |
actor | |
end | |
end | |
# Spawn a new worker for every crashed one | |
def __crash_handler__(actor, reason) | |
@busy.delete actor | |
@idle.delete actor | |
@actors.delete actor | |
return unless reason | |
@idle << __spawn_actor__ | |
signal :respawn_complete | |
end | |
def respond_to?(meth, include_private = false) | |
# NOTE: use method() here since this class | |
# shouldn't be used directly, and method() is less | |
# likely to be "reimplemented" inconsistently | |
# with other Object.*method* methods. | |
found = method(meth) | |
if include_private | |
found ? true : false | |
else | |
if found.is_a?(UnboundMethod) | |
found.owner.public_instance_methods.include?(meth) || | |
found.owner.protected_instance_methods.include?(meth) | |
else | |
found.receiver.public_methods.include?(meth) || | |
found.receiver.protected_methods.include?(meth) | |
end | |
end | |
rescue NameError | |
false | |
end | |
def method_missing(method, *args, &block) | |
if respond_to?(method) | |
_send_ method, *args, &block | |
else | |
super | |
end | |
end | |
# Since Pool allocates worker objects only just before calling them, | |
# we can still help Celluloid::Call detect passing invalid parameters to | |
# async methods by checking for those methods on the worker class | |
def method(meth) | |
super | |
rescue NameError | |
@klass.instance_method(meth.to_sym) | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment