Last active
December 18, 2015 15:48
-
-
Save saimonmoore/5806467 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid/autostart' | |
class FetcherTimeoutError < StandardError; end | |
class ProjectMessage | |
attr_accessor :project | |
def initialize(project) | |
@project = project | |
end | |
end | |
class InterruptMessage; end | |
class Fetcher | |
include Celluloid | |
attr_accessor :project, :timer | |
INACTIVITY_TIMEOUT = 5 | |
def initialize | |
puts "Fetcher inited" | |
@@should_process = true | |
async.fetch | |
end | |
def touch | |
@timer.reset | |
end | |
def process_project(project) | |
@timer = after(INACTIVITY_TIMEOUT) do | |
raise FetcherTimeoutError.new("Worker timed out...") | |
end | |
puts "Working on project #{project}" | |
how_long = rand(10).to_i | |
emails_to_process = Array.new(rand(7).to_i, 'a') | |
puts "Processing #{emails_to_process.length} emails...." | |
emails_to_process.each_with_index do |email, i| | |
email = "#{email}_#{i}" | |
if @@should_process | |
puts "processing: #{email}" | |
if (rand(10) == 0) | |
raise "Boom! Worker died" | |
end | |
Kernel.sleep(how_long) | |
puts "processed: #{email}" | |
else | |
puts "Exiting email processing loop" | |
break | |
end | |
end | |
puts "Worker done work for project: #{project}" | |
rescue FetcherTimeoutError | |
p "Worker expired: #{$!}..." | |
rescue | |
p "Worker died: #{$!}..." | |
ensure | |
p "Worker done." | |
touch | |
end | |
def fetch | |
while @@should_process do | |
message = receive { |msg| true } | |
puts "Got a MyMessage: #{message.inspect} #{message.class}" | |
case message | |
when ProjectMessage | |
process_project(message) | |
when InterruptMessage | |
@@should_process = false | |
end | |
end | |
end | |
end | |
PROJECTS = %w(a b c d e) | |
@@should_fetch = true | |
pool_size = PROJECTS.length.divmod(Celluloid.cores).reduce(:+) | |
supervisor = Celluloid::SupervisionGroup.run! | |
supervisor.pool(Fetcher, as: :fetchers, size: pool_size) | |
Signal.trap('INT') do | |
@@should_fetch = false | |
p "Trapped INT..." | |
p "No more jobs will be processed" | |
end | |
while @@should_fetch do | |
if pool = Celluloid::Actor[:fetchers] | |
PROJECTS.each do |project| | |
if @@should_fetch | |
pool.mailbox << ProjectMessage.new(project) | |
else | |
puts "Stop fetching..." | |
pool.mailbox << InterruptMessage.new | |
puts "Exiting...." | |
exit | |
end | |
end | |
Kernel.sleep 5 | |
p "Next tick..." | |
else | |
p "Pool finalized. Exiting..." | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment