Created
July 7, 2011 21:05
-
-
Save heisters/1070538 to your computer and use it in GitHub Desktop.
polling ruote wait_for
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A set of tools for pausing the current thread until a Ruote process reaches a | |
# certain state. See Workflow::Patient::wait_for for the main entry point. | |
module Workflow::Patient | |
# the number of seconds between polling Ruote | |
TICK = 0.1 | |
# the maximum number of seconds to wait | |
TIMEOUT = 10 | |
# raised if Workflow::Patient::TIMEOUT is exceeded | |
class TimeoutError < Timeout::Error; end | |
class Waiter | |
attr_accessor :wfid, :signal | |
attr_reader :wi_then, :wi_now, :ps_then, :start_time, :state | |
def initialize wfid, signal, &block | |
@wfid, @signal = wfid, signal | |
@ps_then, @wi_then = fetch_ps, fetch_wi | |
@cancel = false | |
yield self if block_given? | |
@start_time = Time.now | |
end | |
# Immediately cancel the wait | |
# | |
# Workflow::Patient.wait_for wfid do |waiter| | |
# if something_is_true? | |
# engine.reply wfid | |
# else | |
# waiter.cancel | |
# end | |
# end | |
def cancel | |
@cancel = true | |
end | |
# Get the workitems for +wfid+ from Ruote. | |
def fetch_wi | |
return [] unless @wfid | |
Workflow.workitems.by_wfid(@wfid) | |
end | |
# Get the process status for +wfid+ from Ruote. | |
def fetch_ps | |
return nil unless @wfid | |
Workflow.engine.ps(@wfid) | |
end | |
# Are the workitems in +wi_now+ different than the workitems in +wi_then+? | |
# This calls them different if the +params+ for any of the workitems are | |
# different. | |
def wis_changed? | |
f = ->wi{wi && wi.fields["params"]} | |
#p [wi_now.map(&f), wi_then.map(&f)] | |
wi_now.map(&f).compact != wi_then.map(&f).compact | |
end | |
# Returns the current "state" of this waiter, which incorporates the state | |
# of the process in Ruote. Possible "states" are: | |
# | |
# [<tt>[:canceled]</tt>] | |
# The waiter was manually canceled. | |
# | |
# [<tt>["foo", "bar", :next]</tt>] | |
# The process is currently on the "foo" and "bar" participants. Any time | |
# the participants have changed from the original, this method will | |
# include +:next+ in its return value. | |
# | |
# [<tt>[:killed]</tt>] | |
# The process has gone away (or maybe doesn't exist yet?). | |
# | |
# [<tt>[:error]</tt>] | |
# The process has an error. | |
# | |
# [<tt>false</tt>] | |
# None of the above. | |
def get_state | |
return [:canceled] if @cancel | |
# try wis first, since they're faster to fetch than the ps | |
@wi_now = fetch_wi | |
ps = nil | |
return wi_now.map(&:participant_name) + [:next] if | |
wis_changed? && wi_now.size > 0 && | |
(wi_now.size == num_parts(ps_then) || | |
wi_now.size == num_parts(ps = fetch_ps)) | |
ps ||= fetch_ps | |
if ps.nil? | |
[:killed] | |
elsif ps.errors.size > 0 | |
[:error] | |
else | |
false | |
end | |
end | |
# Should we continue execution of the current thread, or sleep for one more | |
# tick? | |
def continue? | |
@state = get_state | |
# always continue on :error and :canceled. Don't always continue on | |
# :killed because it may be that the process hasn't been created yet. | |
state && (s = state & [signal, :error, :canceled]).size > 0 ? s : false | |
end | |
def check_timeout # :nodoc: | |
raise TimeoutError if (Time.now - start_time) > TIMEOUT | |
end | |
private | |
def num_parts ps | |
return [] unless ps | |
ps.expressions.select{|e|e.name == "participant"}.size | |
end | |
end | |
class << self | |
# Pause the current thread until the process identified by +wfid+ has | |
# reached the state specified by +signal+. | |
# | |
# A simple example: | |
# | |
# Workflow::Patient.wait_for @wfid | |
# | |
# | |
# == Signals | |
# | |
# [+:next+] (Default) wait until the the process moves to the next | |
# participant, whatever it may be. | |
# [a String] wait until the process reaches a participant with the name | |
# speicifed by the string. | |
# [+:killed+] wait until the process disappears (eg. due to | |
# Ruote::Engine::kill_process) | |
# | |
# | |
# == Passing a block | |
# | |
# +wait_for+ can also take a block that will be executed after the wait has | |
# already been initialized. This allows +wait_for+ to record the state | |
# before your code changes it, so that comparisons of state while waiting | |
# have an accurate picture of the original state. | |
# | |
# For instance, if you were to do the following: | |
# | |
# wfid = engine.launch my_pdef | |
# Workflow::Patient.wait_for wfid | |
# | |
# ...it might fail with a timeout because the Ruote worker would perform | |
# the launch work before Ruby initializes the +wait_for+. Therefore, | |
# +wait_for+ would be waiting for something that had already happened. Of | |
# course, this error would be intermittent and dependent entirely upon | |
# uncontrollable variables (eg. CPU scheduling). So, instead, do: | |
# | |
# Workflow::Patient.wait_for nil do |waiter| | |
# waiter.wfid = engine.launch my_pdef | |
# end | |
# | |
# As you can see in the above example, the block passed to +wait_for+ will | |
# be yielded a Workflow::Patient::Waiter object. One other important thing | |
# you can do with this object is canceling the wait: | |
# | |
# Workflow::Patient.wait_for wfid do |waiter| | |
# if something_is_true? | |
# engine.reply wfid | |
# else | |
# waiter.cancel | |
# end | |
# end | |
# | |
# When +something_is_true?+ returns +false+, the wait will be canceled | |
# immediately. See Workflow::Patient::Waiter for full documentation. | |
# | |
# | |
# == Return values | |
# | |
# +wait_for+ will return the signal(s) that caused the wait to continue. In | |
# a usual cases, this might look like: | |
# | |
# Workflow::Patient.wait_for wfid, :next # => [:next] | |
# Workflow::Patient.wait_for wfid, "foo" # => ["foo"] | |
# Workflow::Patient.wait_for wfid, :killed # => [:killed] | |
# | |
# In addition, the wait will always abort immediately if there is an error | |
# on the Ruote process, or if the wait is canceled. | |
# | |
# Workflow::Patient.wait_for wfid, :next # => [:error] | |
# Workflow::Patient.wait_for(wfid){|w|w.cancel} # => [:canceled] | |
# | |
# | |
# == Errors | |
# | |
# +wait_for+ will raise an instance of Workflow::Patient::TimeoutError if | |
# the amount of time waiting exceeds Workflow::Patient::TIMEOUT seconds. | |
def wait_for wfid, signal=:next, &block | |
waiter = Waiter.new wfid, signal, &block | |
loop do | |
if value = waiter.continue? then break value; end | |
waiter.check_timeout | |
tick | |
end | |
end | |
# Sleep the current thread for the proper amount of time. This is mostly a | |
# separate method in order to facilitate stubbing :P | |
def tick | |
sleep TICK | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment