Created
August 9, 2016 13:15
-
-
Save prathmeshranaut/8567d838caa5e347a0027e751fdbe1fe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "logger" | |
require "thread" | |
require "timeout" | |
require "set" | |
$CELLULOID_DEBUG = false | |
$CELLULOID_MANAGED ||= false | |
require "celluloid/version" | |
require "celluloid/notices" | |
$CELLULOID_BACKPORTED = false if defined?(CELLULOID_FUTURE) && CELLULOID_FUTURE | |
$CELLULOID_BACKPORTED = (ENV["CELLULOID_BACKPORTED"] != "false") unless defined?($CELLULOID_BACKPORTED) | |
Celluloid::Notices.backported if $CELLULOID_BACKPORTED | |
module Celluloid | |
# Expose all instance methods as singleton methods | |
extend self | |
# Linking times out after 5 seconds | |
LINKING_TIMEOUT = 5 | |
# Warning message added to Celluloid objects accessed outside their actors | |
BARE_OBJECT_WARNING_MESSAGE = "WARNING: BARE CELLULOID OBJECT " | |
class << self | |
attr_writer :actor_system # Default Actor System | |
attr_accessor :logger # Thread-safe logger class | |
attr_accessor :log_actor_crashes | |
attr_accessor :group_class # Default internal thread group to use | |
attr_accessor :task_class # Default task type to use | |
attr_accessor :shutdown_timeout # How long actors have to terminate | |
def actor_system | |
if Thread.current.celluloid? | |
Thread.current[:celluloid_actor_system] || raise(Error, "actor system not running") | |
else | |
Thread.current[:celluloid_actor_system] || @actor_system || raise(Error, "Celluloid is not yet started; use Celluloid.boot") | |
end | |
end | |
def included(klass) | |
klass.send :extend, ClassMethods | |
klass.send :include, InstanceMethods | |
klass.send :extend, Internals::Properties | |
klass.property :mailbox_class, default: Celluloid::Mailbox | |
klass.property :proxy_class, default: Celluloid::Proxy::Cell | |
klass.property :task_class, default: Celluloid.task_class | |
klass.property :group_class, default: Celluloid.group_class | |
klass.property :mailbox_size | |
klass.property :exclusive_actor, default: false | |
klass.property :exclusive_methods, multi: true | |
klass.property :execute_block_on_receiver, | |
default: [:after, :every, :receive], | |
multi: true | |
klass.property :finalizer | |
klass.property :exit_handler_name | |
singleton = class << klass; self; end | |
singleton.send(:remove_method, :trap_exit) rescue nil | |
singleton.send(:remove_method, :exclusive) rescue nil | |
singleton.send(:define_method, :trap_exit) do |*args| | |
exit_handler_name(*args) | |
end | |
singleton.send(:define_method, :exclusive) do |*args| | |
if args.any? | |
exclusive_methods(*exclusive_methods, *args) | |
else | |
exclusive_actor true | |
end | |
end | |
end | |
# Are we currently inside of an actor? | |
def actor? | |
!!Thread.current[:celluloid_actor] | |
end | |
# Retrieve the mailbox for the current thread or lazily initialize it | |
def mailbox | |
Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new | |
end | |
# Generate a Universally Unique Identifier | |
def uuid | |
Internals::UUID.generate | |
end | |
# Obtain the number of CPUs in the system | |
def cores | |
Internals::CPUCounter.cores | |
end | |
alias_method :cpus, :cores | |
alias_method :ncpus, :cores | |
# Perform a stack dump of all actors to the given output object | |
def stack_dump(output = STDERR) | |
actor_system.stack_dump.print(output) | |
end | |
alias_method :dump, :stack_dump | |
# Perform a stack summary of all actors to the given output object | |
def stack_summary(output = STDERR) | |
actor_system.stack_summary.print(output) | |
end | |
alias_method :summarize, :stack_summary | |
def public_registry | |
actor_system.public_registry | |
end | |
# Detect if a particular call is recursing through multiple actors | |
def detect_recursion | |
actor = Thread.current[:celluloid_actor] | |
return unless actor | |
task = Thread.current[:celluloid_task] | |
return unless task | |
chain_id = Internals::CallChain.current_id | |
actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id } | |
end | |
# Define an exception handler for actor crashes | |
def exception_handler(&block) | |
Internals::Logger.exception_handler(&block) | |
end | |
def suspend(status, waiter) | |
task = Thread.current[:celluloid_task] | |
if task && !Celluloid.exclusive? | |
waiter.before_suspend(task) if waiter.respond_to?(:before_suspend) | |
Task.suspend(status) | |
else | |
waiter.wait | |
end | |
end | |
def boot | |
init | |
start | |
end | |
def init | |
@actor_system ||= Actor::System.new | |
end | |
def start | |
actor_system.start | |
end | |
def running? | |
actor_system && actor_system.running? | |
end | |
#de TODO Anticipate outside process finalizer that would by-pass this. | |
def register_shutdown | |
return if defined?(@shutdown_registered) && @shutdown_registered | |
# Terminate all actors at exit, unless the exit is abnormal. | |
at_exit do | |
Celluloid.shutdown unless $! | |
end | |
@shutdown_registered = true | |
end | |
# Shut down all running actors | |
def shutdown | |
actor_system.shutdown | |
end | |
def version | |
VERSION | |
end | |
end | |
# Class methods added to classes which include Celluloid | |
module ClassMethods | |
def new(*args, &block) | |
proxy = Cell.new(allocate, behavior_options, actor_options).proxy | |
proxy._send_(:initialize, *args, &block) | |
proxy | |
end | |
alias_method :spawn, :new | |
# Create a new actor and link to the current one | |
def new_link(*args, &block) | |
raise NotActorError, "can't link outside actor context" unless Celluloid.actor? | |
proxy = Cell.new(allocate, behavior_options, actor_options).proxy | |
Actor.link(proxy) | |
proxy._send_(:initialize, *args, &block) | |
proxy | |
end | |
alias_method :spawn_link, :new_link | |
# Run an actor in the foreground | |
def run(*args, &block) | |
Actor.join(new(*args, &block)) | |
end | |
def actor_system | |
Celluloid.actor_system | |
end | |
# Configuration options for Actor#new | |
def actor_options | |
{ | |
actor_system: actor_system, | |
mailbox_class: mailbox_class, | |
mailbox_size: mailbox_size, | |
task_class: task_class, | |
exclusive: exclusive_actor, | |
} | |
end | |
def behavior_options | |
{ | |
proxy_class: proxy_class, | |
exclusive_methods: exclusive_methods, | |
exit_handler_name: exit_handler_name, | |
finalizer: finalizer, | |
receiver_block_executions: execute_block_on_receiver, | |
} | |
end | |
def ===(other) | |
other.is_a? self | |
end | |
end | |
# These are methods we don't want added to the Celluloid singleton but to be | |
# defined on all classes that use Celluloid | |
module InstanceMethods | |
# Obtain the bare Ruby object the actor is wrapping. This is useful for | |
# only a limited set of use cases like runtime metaprogramming. Interacting | |
# directly with the bare object foregoes any kind of thread safety that | |
# Celluloid would ordinarily provide you, and the object is guaranteed to | |
# be shared with at least the actor thread. Tread carefully. | |
# | |
# Bare objects can be identified via #inspect output: | |
# | |
# >> actor | |
# => #<Celluloid::Actor(Foo:0x3fefcb77c194)> | |
# >> actor.bare_object | |
# => #<WARNING: BARE CELLULOID OBJECT (Foo:0x3fefcb77c194)> | |
# | |
def bare_object | |
self | |
end | |
alias_method :wrapped_object, :bare_object | |
# Are we being invoked in a different thread from our owner? | |
def leaked? | |
@celluloid_owner != Thread.current[:celluloid_actor] | |
end | |
def tap | |
yield current_actor | |
current_actor | |
end | |
# Obtain the name of the current actor | |
def registered_name | |
Actor.registered_name | |
end | |
alias_method :name, :registered_name | |
def inspect | |
return "..." if Celluloid.detect_recursion | |
str = "#<" | |
if leaked? | |
str << Celluloid::BARE_OBJECT_WARNING_MESSAGE | |
else | |
str << "Celluloid::Proxy::Cell" | |
end | |
str << "(#{self.class}:0x#{object_id.to_s(16)})" | |
str << " " unless instance_variables.empty? | |
instance_variables.each do |ivar| | |
next if ivar == Celluloid::OWNER_IVAR | |
str << "#{ivar}=#{instance_variable_get(ivar).inspect} " | |
end | |
str.sub!(/\s$/, ">") | |
end | |
def __arity | |
method(:initialize).arity | |
end | |
end | |
# | |
# The following methods are available on both the Celluloid singleton and | |
# directly inside of all classes that include Celluloid | |
# | |
# Raise an exception in sender context, but stay running | |
def abort(cause) | |
cause = case cause | |
when String then RuntimeError.new(cause) | |
when Exception then cause | |
else raise TypeError, "Exception object/String expected, but #{cause.class} received" | |
end | |
raise AbortError.new(cause) | |
end | |
# Terminate this actor | |
def terminate | |
Thread.current[:celluloid_actor].behavior_proxy.terminate! | |
end | |
# Send a signal with the given name to all waiting methods | |
def signal(name, value = nil) | |
Thread.current[:celluloid_actor].signal name, value | |
end | |
# Wait for the given signal | |
def wait(name) | |
Thread.current[:celluloid_actor].wait name | |
end | |
# Obtain the current_actor | |
def current_actor | |
Actor.current | |
end | |
# Obtain the UUID of the current call chain | |
def call_chain_id | |
Internals::CallChain.current_id | |
end | |
# Obtain the running tasks for this actor | |
def tasks | |
Thread.current[:celluloid_actor].tasks.to_a | |
end | |
# Obtain the Celluloid::Links for this actor | |
def links | |
Thread.current[:celluloid_actor].links | |
end | |
# Watch for exit events from another actor | |
def monitor(actor) | |
Actor.monitor(actor) | |
end | |
# Stop waiting for exit events from another actor | |
def unmonitor(actor) | |
Actor.unmonitor(actor) | |
end | |
# Link this actor to another, allowing it to crash or react to errors | |
def link(actor) | |
Actor.link(actor) | |
end | |
# Remove links to another actor | |
def unlink(actor) | |
Actor.unlink(actor) | |
end | |
# Are we monitoring another actor? | |
def monitoring?(actor) | |
Actor.monitoring?(actor) | |
end | |
# Is this actor linked to another? | |
def linked_to?(actor) | |
Actor.linked_to?(actor) | |
end | |
# Receive an asynchronous message via the actor protocol | |
def receive(timeout = nil, &block) | |
actor = Thread.current[:celluloid_actor] | |
if actor | |
actor.receive(timeout, &block) | |
else | |
Celluloid.mailbox.receive(timeout, &block) | |
end | |
end | |
# Sleep letting the actor continue processing messages | |
def sleep(interval) | |
actor = Thread.current[:celluloid_actor] | |
if actor | |
actor.sleep(interval) | |
else | |
Kernel.sleep interval | |
end | |
end | |
# Timeout on task suspension (eg Sync calls to other actors) | |
def timeout(duration) | |
Thread.current[:celluloid_actor].timeout(duration) do | |
yield | |
end | |
end | |
# Run given block in an exclusive mode: all synchronous calls block the whole | |
# actor, not only current message processing. | |
def exclusive(&block) | |
Thread.current[:celluloid_task].exclusive(&block) | |
end | |
# Are we currently exclusive | |
def exclusive? | |
task = Thread.current[:celluloid_task] | |
task && task.exclusive? | |
end | |
# Call a block after a given interval, returning a Celluloid::Timer object | |
def after(interval, &block) | |
Thread.current[:celluloid_actor].after(interval, &block) | |
end | |
# Call a block every given interval, returning a Celluloid::Timer object | |
def every(interval, &block) | |
Thread.current[:celluloid_actor].every(interval, &block) | |
end | |
# Perform a blocking or computationally intensive action inside an | |
# asynchronous group of threads, allowing the sender to continue processing other | |
# messages in its mailbox in the meantime | |
def defer(&block) | |
# This implementation relies on the present implementation of | |
# Celluloid::Future, which uses a thread from InternalPool to run the block | |
Future.new(&block).value | |
end | |
# Handle async calls within an actor itself | |
def async(meth = nil, *args, &block) | |
Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block | |
end | |
# Handle calls to future within an actor itself | |
def future(meth = nil, *args, &block) | |
Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block | |
end | |
end | |
if defined?(JRUBY_VERSION) && JRUBY_VERSION == "1.7.3" | |
raise "Celluloid is broken on JRuby 1.7.3. Please upgrade to 1.7.4+" | |
end | |
require "celluloid/exceptions" | |
Celluloid.logger = Logger.new(STDERR).tap do |logger| | |
logger.level = Logger::INFO unless $CELLULOID_DEBUG | |
end | |
Celluloid.shutdown_timeout = 10 | |
Celluloid.log_actor_crashes = true | |
require "celluloid/calls" | |
require "celluloid/condition" | |
require "celluloid/thread" | |
require "celluloid/core_ext" | |
require "celluloid/system_events" | |
require "celluloid/proxies" | |
require "celluloid/mailbox" | |
require "celluloid/mailbox/evented" | |
require "celluloid/essentials" | |
require "celluloid/group" | |
require "celluloid/group/spawner" | |
require "celluloid/group/pool" # TODO: Find way to only load this if being used. | |
require "celluloid/task" | |
require "celluloid/task/fibered" | |
require "celluloid/task/threaded" # TODO: Find way to only load this if being used. | |
require "celluloid/actor" | |
require "celluloid/cell" | |
require "celluloid/future" | |
require "celluloid/actor/system" | |
require "celluloid/actor/manager" | |
require "celluloid/multiplex" | |
require "celluloid/deprecate" unless $CELLULOID_BACKPORTED == false | |
$CELLULOID_MONITORING = false | |
Celluloid::Notices.output | |
# Configure default systemwide settings | |
Celluloid.task_class = | |
begin | |
str = ENV["CELLULOID_TASK_CLASS"] || "Fibered" | |
Kernel.const_get(str) | |
rescue NameError | |
begin | |
Celluloid.const_get(str) | |
rescue NameError | |
Celluloid::Task.const_get(str) | |
end | |
end | |
Celluloid.group_class = | |
begin | |
str = ENV["CELLULOID_GROUP_CLASS"] || "Spawner" | |
Kernel.const_get(str) | |
rescue NameError | |
begin | |
Celluloid.const_get(str) | |
rescue NameError | |
Celluloid::Group.const_get(str) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment