Skip to content

Instantly share code, notes, and snippets.

@saimonmoore
Last active December 18, 2015 15:48
Show Gist options
  • Save saimonmoore/5806467 to your computer and use it in GitHub Desktop.
Save saimonmoore/5806467 to your computer and use it in GitHub Desktop.
require 'celluloid/autostart'
class FetcherTimeoutError < StandardError; end
class ProjectMessage
attr_accessor :project
def initialize(project)
@project = project
end
end
class InterruptMessage; end
class Fetcher
include Celluloid
attr_accessor :project, :timer
INACTIVITY_TIMEOUT = 5
def initialize
puts "Fetcher inited"
@@should_process = true
async.fetch
end
def touch
@timer.reset
end
def process_project(project)
@timer = after(INACTIVITY_TIMEOUT) do
raise FetcherTimeoutError.new("Worker timed out...")
end
puts "Working on project #{project}"
how_long = rand(10).to_i
emails_to_process = Array.new(rand(7).to_i, 'a')
puts "Processing #{emails_to_process.length} emails...."
emails_to_process.each_with_index do |email, i|
email = "#{email}_#{i}"
if @@should_process
puts "processing: #{email}"
if (rand(10) == 0)
raise "Boom! Worker died"
end
Kernel.sleep(how_long)
puts "processed: #{email}"
else
puts "Exiting email processing loop"
break
end
end
puts "Worker done work for project: #{project}"
rescue FetcherTimeoutError
p "Worker expired: #{$!}..."
rescue
p "Worker died: #{$!}..."
ensure
p "Worker done."
touch
end
def fetch
while @@should_process do
message = receive { |msg| true }
puts "Got a MyMessage: #{message.inspect} #{message.class}"
case message
when ProjectMessage
process_project(message)
when InterruptMessage
@@should_process = false
end
end
end
end
PROJECTS = %w(a b c d e)
@@should_fetch = true
pool_size = PROJECTS.length.divmod(Celluloid.cores).reduce(:+)
supervisor = Celluloid::SupervisionGroup.run!
supervisor.pool(Fetcher, as: :fetchers, size: pool_size)
Signal.trap('INT') do
@@should_fetch = false
p "Trapped INT..."
p "No more jobs will be processed"
end
while @@should_fetch do
if pool = Celluloid::Actor[:fetchers]
PROJECTS.each do |project|
if @@should_fetch
pool.mailbox << ProjectMessage.new(project)
else
puts "Stop fetching..."
pool.mailbox << InterruptMessage.new
puts "Exiting...."
exit
end
end
Kernel.sleep 5
p "Next tick..."
else
p "Pool finalized. Exiting..."
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment