Created
September 20, 2012 21:22
-
-
Save schmurfy/3758436 to your computer and use it in GitHub Desktop.
Celluloid workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid' | |
WORKERS_COUNT = (ARGV[0] || 1).to_i | |
class Output | |
include Celluloid | |
def puts(msg) | |
Kernel.puts(msg) | |
end | |
end | |
class Worker | |
include Celluloid | |
def initialize(source) | |
async.run(source) | |
end | |
def run(source) | |
loop do | |
job = source.get_job() | |
break unless job | |
Actor[:out].puts "[#{thread_id}] Got job '#{job}'" | |
sleep(1) | |
Actor[:out].puts "[#{thread_id}] Completed job '#{job}'" | |
end | |
Actor[:out].puts "[#{thread_id}] Done." | |
source.async.work_completed() | |
end | |
private | |
def thread_id | |
'%x' % Thread.current.object_id | |
end | |
end | |
class Store | |
include Celluloid | |
def initialize(*content) | |
@left = content | |
@workers_left = WORKERS_COUNT | |
end | |
def get_job | |
@left.shift | |
end | |
def work_completed | |
@workers_left -= 1 | |
if @workers_left == 0 | |
signal(:all_job_completed) | |
end | |
end | |
def wait_completion | |
wait(:all_job_completed) | |
Actor[:out].puts "Shutting down..." | |
end | |
end | |
Output.supervise_as(:out) | |
started_at = Time.now | |
store = Store.new("a", 45, "hu", 6, 8, 9) | |
if WORKERS_COUNT == 1 | |
Worker.new(store) | |
else | |
Worker.pool(size: WORKERS_COUNT, args: [store]) | |
end | |
store.wait_completion() | |
elapsed = '%.2f' % (Time.now - started_at) | |
puts "Completd in #{elapsed} seconds" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'celluloid' | |
WORKERS_COUNT = (ARGV[0] || 1).to_i | |
class Output | |
include Celluloid | |
def puts(msg) | |
Kernel.puts(msg) | |
end | |
end | |
class Worker | |
include Celluloid | |
def initialize(source) | |
async.run(source) | |
end | |
def run(source) | |
loop do | |
job = source.get_job() | |
break unless job | |
Actor[:out].puts "[#{thread_id}] Got job '#{job}'" | |
sleep(0.5) | |
Actor[:out].puts "[#{thread_id}] Completed job '#{job}'" | |
end | |
Actor[:out].puts "[#{thread_id}] Done." | |
source.async.work_completed() | |
end | |
private | |
def thread_id | |
'%x' % Thread.current.object_id | |
end | |
end | |
class Store | |
include Celluloid | |
def initialize(*content) | |
@left = content | |
@done = false | |
@workers_left = WORKERS_COUNT | |
end | |
def get_job | |
if !@done && @left.empty? | |
wait(:new_jobs) | |
end | |
@left.shift | |
end | |
def add_job(value) | |
@left << value | |
end | |
def resume | |
signal(:new_jobs) | |
end | |
def done | |
@done = true | |
resume() | |
end | |
def work_completed | |
@workers_left -= 1 | |
if @workers_left == 0 | |
signal(:all_job_completed) | |
end | |
end | |
def wait_completion | |
wait(:all_job_completed) | |
Actor[:out].puts "Shutting down..." | |
end | |
end | |
Output.supervise_as(:out) | |
started_at = Time.now | |
store = Store.new("job 0", "job 1", "job 3", "job 4") | |
if WORKERS_COUNT == 1 | |
Worker.new(store) | |
else | |
Worker.pool(size: WORKERS_COUNT, args: [store]) | |
end | |
Thread.new do | |
sleep 2 | |
10.times{|n| store.add_job("new job #{n}") } | |
store.resume() | |
sleep 2 | |
store.done() | |
end | |
store.wait_completion() | |
elapsed = '%.2f' % (Time.now - started_at) | |
puts "Completd in #{elapsed} seconds" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'celluloid' | |
WORKERS_COUNT = (ARGV[0] || 1).to_i | |
class Output | |
include Celluloid | |
def puts(msg) | |
Kernel.puts(msg) | |
end | |
end | |
class Worker | |
include Celluloid | |
def initialize(source) | |
async.run(source) | |
end | |
def run(source) | |
loop do | |
job = source.get_job() | |
if job == :exit | |
break | |
elsif job != nil | |
Actor[:out].puts "[#{thread_id}] Got job '#{job}'" | |
sleep(0.5) | |
Actor[:out].puts "[#{thread_id}] Completed job '#{job}'" | |
end | |
end | |
Actor[:out].puts "[#{thread_id}] Done." | |
source.async.work_completed() | |
end | |
private | |
def thread_id | |
'%x' % Thread.current.object_id | |
end | |
end | |
class Store | |
include Celluloid | |
def initialize(*content) | |
@left = content | |
@done = false | |
@workers_left = WORKERS_COUNT | |
end | |
def get_job | |
if @left.empty? && @done | |
:exit | |
else | |
if @left.empty? | |
wait(:new_jobs) | |
end | |
@left.shift | |
end | |
end | |
def add_job(value) | |
@left << value | |
signal(:new_jobs) | |
end | |
def done | |
@done = true | |
signal(:new_jobs) | |
end | |
def work_completed | |
@workers_left -= 1 | |
if @workers_left == 0 | |
signal(:all_job_completed) | |
end | |
end | |
def wait_completion | |
wait(:all_job_completed) | |
Actor[:out].puts "Shutting down..." | |
end | |
end | |
Output.supervise_as(:out) | |
started_at = Time.now | |
store = Store.new("job 0", "job 1", "job 3", "job 4") | |
if WORKERS_COUNT == 1 | |
Worker.new(store) | |
else | |
Worker.pool(size: WORKERS_COUNT, args: [store]) | |
end | |
Thread.new do | |
sleep 2 | |
10.times{|n| store.add_job("new job #{n}") } | |
sleep 2 | |
store.done() | |
end | |
store.wait_completion() | |
elapsed = '%.2f' % (Time.now - started_at) | |
puts "Completd in #{elapsed} seconds" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment