Skip to content

Instantly share code, notes, and snippets.

@heisters
Created July 7, 2011 21:05
Show Gist options
  • Save heisters/1070538 to your computer and use it in GitHub Desktop.
Save heisters/1070538 to your computer and use it in GitHub Desktop.
polling ruote wait_for
# A set of tools for pausing the current thread until a Ruote process reaches a
# certain state. See Workflow::Patient::wait_for for the main entry point.
module Workflow::Patient
# the number of seconds between polling Ruote
TICK = 0.1
# the maximum number of seconds to wait
TIMEOUT = 10
# raised if Workflow::Patient::TIMEOUT is exceeded
class TimeoutError < Timeout::Error; end
class Waiter
attr_accessor :wfid, :signal
attr_reader :wi_then, :wi_now, :ps_then, :start_time, :state
def initialize wfid, signal, &block
@wfid, @signal = wfid, signal
@ps_then, @wi_then = fetch_ps, fetch_wi
@cancel = false
yield self if block_given?
@start_time = Time.now
end
# Immediately cancel the wait
#
# Workflow::Patient.wait_for wfid do |waiter|
# if something_is_true?
# engine.reply wfid
# else
# waiter.cancel
# end
# end
def cancel
@cancel = true
end
# Get the workitems for +wfid+ from Ruote.
def fetch_wi
return [] unless @wfid
Workflow.workitems.by_wfid(@wfid)
end
# Get the process status for +wfid+ from Ruote.
def fetch_ps
return nil unless @wfid
Workflow.engine.ps(@wfid)
end
# Are the workitems in +wi_now+ different than the workitems in +wi_then+?
# This calls them different if the +params+ for any of the workitems are
# different.
def wis_changed?
f = ->wi{wi && wi.fields["params"]}
#p [wi_now.map(&f), wi_then.map(&f)]
wi_now.map(&f).compact != wi_then.map(&f).compact
end
# Returns the current "state" of this waiter, which incorporates the state
# of the process in Ruote. Possible "states" are:
#
# [<tt>[:canceled]</tt>]
# The waiter was manually canceled.
#
# [<tt>["foo", "bar", :next]</tt>]
# The process is currently on the "foo" and "bar" participants. Any time
# the participants have changed from the original, this method will
# include +:next+ in its return value.
#
# [<tt>[:killed]</tt>]
# The process has gone away (or maybe doesn't exist yet?).
#
# [<tt>[:error]</tt>]
# The process has an error.
#
# [<tt>false</tt>]
# None of the above.
def get_state
return [:canceled] if @cancel
# try wis first, since they're faster to fetch than the ps
@wi_now = fetch_wi
ps = nil
return wi_now.map(&:participant_name) + [:next] if
wis_changed? && wi_now.size > 0 &&
(wi_now.size == num_parts(ps_then) ||
wi_now.size == num_parts(ps = fetch_ps))
ps ||= fetch_ps
if ps.nil?
[:killed]
elsif ps.errors.size > 0
[:error]
else
false
end
end
# Should we continue execution of the current thread, or sleep for one more
# tick?
def continue?
@state = get_state
# always continue on :error and :canceled. Don't always continue on
# :killed because it may be that the process hasn't been created yet.
state && (s = state & [signal, :error, :canceled]).size > 0 ? s : false
end
def check_timeout # :nodoc:
raise TimeoutError if (Time.now - start_time) > TIMEOUT
end
private
def num_parts ps
return [] unless ps
ps.expressions.select{|e|e.name == "participant"}.size
end
end
class << self
# Pause the current thread until the process identified by +wfid+ has
# reached the state specified by +signal+.
#
# A simple example:
#
# Workflow::Patient.wait_for @wfid
#
#
# == Signals
#
# [+:next+] (Default) wait until the the process moves to the next
# participant, whatever it may be.
# [a String] wait until the process reaches a participant with the name
# speicifed by the string.
# [+:killed+] wait until the process disappears (eg. due to
# Ruote::Engine::kill_process)
#
#
# == Passing a block
#
# +wait_for+ can also take a block that will be executed after the wait has
# already been initialized. This allows +wait_for+ to record the state
# before your code changes it, so that comparisons of state while waiting
# have an accurate picture of the original state.
#
# For instance, if you were to do the following:
#
# wfid = engine.launch my_pdef
# Workflow::Patient.wait_for wfid
#
# ...it might fail with a timeout because the Ruote worker would perform
# the launch work before Ruby initializes the +wait_for+. Therefore,
# +wait_for+ would be waiting for something that had already happened. Of
# course, this error would be intermittent and dependent entirely upon
# uncontrollable variables (eg. CPU scheduling). So, instead, do:
#
# Workflow::Patient.wait_for nil do |waiter|
# waiter.wfid = engine.launch my_pdef
# end
#
# As you can see in the above example, the block passed to +wait_for+ will
# be yielded a Workflow::Patient::Waiter object. One other important thing
# you can do with this object is canceling the wait:
#
# Workflow::Patient.wait_for wfid do |waiter|
# if something_is_true?
# engine.reply wfid
# else
# waiter.cancel
# end
# end
#
# When +something_is_true?+ returns +false+, the wait will be canceled
# immediately. See Workflow::Patient::Waiter for full documentation.
#
#
# == Return values
#
# +wait_for+ will return the signal(s) that caused the wait to continue. In
# a usual cases, this might look like:
#
# Workflow::Patient.wait_for wfid, :next # => [:next]
# Workflow::Patient.wait_for wfid, "foo" # => ["foo"]
# Workflow::Patient.wait_for wfid, :killed # => [:killed]
#
# In addition, the wait will always abort immediately if there is an error
# on the Ruote process, or if the wait is canceled.
#
# Workflow::Patient.wait_for wfid, :next # => [:error]
# Workflow::Patient.wait_for(wfid){|w|w.cancel} # => [:canceled]
#
#
# == Errors
#
# +wait_for+ will raise an instance of Workflow::Patient::TimeoutError if
# the amount of time waiting exceeds Workflow::Patient::TIMEOUT seconds.
def wait_for wfid, signal=:next, &block
waiter = Waiter.new wfid, signal, &block
loop do
if value = waiter.continue? then break value; end
waiter.check_timeout
tick
end
end
# Sleep the current thread for the proper amount of time. This is mostly a
# separate method in order to facilitate stubbing :P
def tick
sleep TICK
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment