Skip to content

Instantly share code, notes, and snippets.

@zultron
Last active October 2, 2016 12:58
Show Gist options
  • Save zultron/7390e5f689e43a17bb2b to your computer and use it in GitHub Desktop.
Save zultron/7390e5f689e43a17bb2b to your computer and use it in GitHub Desktop.
Mealy FSM implemented with Python coroutines in a ZMQ IOLoop
#!/usr/bin/python
'''
Copyright (c) 2014 John Morris <[email protected]>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
mealy-coroutine.py
Python 'co-routines' [1] are a neat way to implement FSMs. The basic
control mechanisms can be implemented in a ZMQ ioloop[2] context. The
mechanisms are:
- Relinquishing control to the ioloop via 'yield'
- Returning control back to the FSM via passing the co-routine's
'next()' method as a callback to:
- 'add_timeout()': The FSM may relinquish and regain control after a
period
- 'add_callback()': External events pass control to FSM for
servicing via subclass methods
My previous attempts at FSMs in an ioloop involved trying to maintain
coherency among a confusing stack of state variables, sorting those
with verbose logic to determine the correct next state and its output,
and trying to make up meaningful names for the various
state-transition callbacks.
The below system abstracts much of the control infrastructure into the
'MealyCoroutine' base class, and a specific state machine subclass
co-routine looks like a regular Python function: simple, intuitive and
easy to read, with state and control flow naturally maintained in the
co-routine across states where control is relinquished to the ioloop
to handle `Poller()` events and other asynchronous, simultaneous FSMs.
This is a complete example with the base 'MealyCoroutine' class and
two demo subclasses: 'MasterD', a model of an application that starts
two subsystems, handles signal and control events while they run, and
shuts them down; and 'Coprocess', a model of managing the life-cycle
of a coprocess fork, monitor and shutdown stages.
These demos are not complete or correct in this form, but they do
provide complete examples of control flow for FSMs running as
co-routines in a ZMQ or Tornado ioloop.
[1]: http://eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines
[2]: http://zeromq.github.io/pyzmq/api/generated/zmq.eventloop.ioloop.html
'''
import datetime, logging
from zmq.eventloop import ioloop
class MealyCoroutineError(RuntimeError):
pass
def resume_state_machine(func):
'''
This decorator runs the method, and then calls
'return_next_loop()' to schedule control to be passed back to the
FSM on the next loop
'''
def schedule_and_call(obj, *args, **kwargs):
res = func(obj, *args, **kwargs)
obj.return_next_loop()
return res
return schedule_and_call
def resume_state_machine_clear_timeout(func):
'''
This decorator runs the method, and then calls
'return_next_loop(clear_timeout=True)' to schedule control to be
passed back to the FSM on the next loop, clearing irrelevant
timeouts
'''
def schedule_and_call(obj, *args, **kwargs):
res = func(obj, *args, **kwargs)
obj.return_next_loop(clear_timeout=True)
return res
return schedule_and_call
class MealyCoroutine(object):
'''
Base class for a Mealy FSM that coordinates state-changing events
from a ZMQ IOLoop
'''
log = logging.getLogger(__name__)
def __init__(self):
'''
Initialize state machine
'''
# Track shutdown state
self.shutdown_requested = False
self.shutdown_begun = False
# The IOLoop singleton instance for coordinating control
self.loop = ioloop.IOLoop.current()
# The opaque timeout object and a flag indicating whether
# control was returned from the timeout or an intervening
# event
self.timeout = None
self.timeout_expired = False
# Init state machine object and add to loop
self.state_machine_obj = self.state_machine()
self.loop.add_callback(self.state_machine_next)
@resume_state_machine
def shutdown(self, msg=None):
'''
'shutdown' callback for external control
Set 'shutdown_requested' flag; detect repeated requests
The 'resume_state_machine' decorator schedules control to be
returned to the FSM on the next loop
'''
if msg is None: msg = "External shutdown request"
if self.shutdown_requested:
self.log.debug("%s: Shutdown already in progress" % msg)
else:
self.log.info("%s: Starting shutdown sequence" % msg)
self.shutdown_requested = True
def clear_timeout(self):
'''
Clear any timeout
Timeout events may be preempted by other events; the timeout
may be cleared if control is not expected to returned to the
FSM by the timeout.
The 'resume_state_machine_clear_timeout' decorator can be used
to clear the timeout before passing control back to the FSM
upon surch an event.
'''
if self.timeout is not None:
self.loop.remove_timeout(self.timeout)
self.timeout = None
def state_machine_next(self, from_wrapper=False, clear_timeout=False):
'''
Used as a callback to pass control back to the FSM via the
co-routine's 'next()' method
If a separate 'shutdown_state_machine' FSM is defined, replace
the main 'state_machine' FSM with that if not already done.
This method is called from other methods, which may request
the timeout be cleared, and which may request to do their own
debug logging.
'''
if not from_wrapper:
if clear_timeout:
self.log.debug(" control returned; clearing timeout")
else:
self.log.debug(" control returned")
# If shutdown requested and there's a separate shutdown state
# machine coroutine, set it up as new state machine
if self.shutdown_requested and not self.shutdown_begun and \
hasattr(self, 'shutdown_state_machine'):
self.log.debug(
" initializing shutdown state machine")
self.state_machine_obj = self.shutdown_state_machine()
self.shutdown_begun = True # Don't do this twice
# Clear timeout, if requested
if clear_timeout: self.clear_timeout()
# Return control to state machine
return self.state_machine_obj.next()
def state_machine_next_timeout(self):
'''
Used as a callback from a timeout to pass control back to the
FSM; sets the 'timeout_expired' flag
In some instances, a state machine needs to know whether
control was returned from a timeout or from an intervening
event, indicated by the 'timeout_expired' flag.
'''
self.log.debug(" control returned: timeout expired")
self.timeout_expired = True
return self.state_machine_next(from_wrapper=True)
def return_after_timeout(self, seconds):
'''
Sets a timeout, after which the FSM may 'yield' control
The FSM will regain control after the specified number of
seconds, upon which 'timeout_expired' will be set. If another
intervening event occurs first, 'timeout_expired' will be
unset.
'''
self.log.debug(" setting timeout for %s seconds" % seconds)
self.clear_timeout()
self.timeout_expired = False
self.timeout = self.loop.add_timeout(
datetime.timedelta(seconds=seconds),
self.state_machine_next_timeout)
def return_next_loop(self, clear_timeout=False):
'''
Schedule return of control to the FSM at the next loop
'''
self.log.debug(" returning control in next loop")
self.loop.add_callback(self.state_machine_next,
clear_timeout=clear_timeout)
def stop(self):
'''
Schedule the main IOLoop to be stopped at the next loop
'''
self.log.info("Stopping loop")
self.loop.add_callback(self.loop.stop)
def state_machine(self):
'''
The main state machine method
A subclass must override this method to implement its FSM.
'''
err = "Subclasses must override 'state_machine' method"
self.log.error(err)
raise MealyCoroutineError(err)
class MasterD(MealyCoroutine):
'''
A FSM subclass demonstrating control flow for a simplified
Machinekit master daemon
This is not meant to be a complete or accurate 'masterd' state
machine model. It is intended only to demonstrate how the IOLoop +
co-routine control flows work.
'''
def __init__(self):
'''
Initialize the master daemon FSM
'''
super(MasterD, self).__init__()
# The config set by set_config()
self.config = None
@resume_state_machine_clear_timeout
def set_config(self, config):
'''
A callback for the 'config picker' service to pass the chosen
config to the masterd
The 'resume_state_machine_clear_timeout' clears any timeouts
and passes control to the masterd FSM on the next loop.
'''
self.log.info("External request, set config to %s" % config)
self.config = config
def config_choice(self):
'''
If a configuration has been chosen, set it up and return True,
else False
'''
if self.config is None:
self.log.debug(" Waiting for config choice")
# Signal to state machine that no config was found
return False
self.log.info("Setting up chosen config %s" % self.config)
return True
def do_something(self, desc):
'''
A place holder method for doing something real
'''
self.log.info(desc)
def state_machine(self):
'''
The masterd session state machine
A simple demo of FSM control in a ZMQ IOLoop:
- Wait for a config to be picked; the 'config picker' service
will call 'set_config(filename)'
- Start up RT
- Start up app (linuxcncsrvr, milltask, etc. co-routines)
- Optionally schedule periodic service activities while app
runs
When 'shutdown()' is called, this FSM will be pulled out of
the loop and replaced with the 'shutdown_state_machine' FSM.
'''
self.log.info("Init server")
# The following loop is just to demonstrate how the FSM can
# regain periodic control for whatever reason; if it were
# replaced with a simple 'yield', control will be returned
# once 'self.set_config(config)' is called from an external
# event
while not self.config_choice():
# No config chosen; do something, and then yield to ioloop
# for two seconds
self.do_something("Periodically do something while waiting for "
"config to be picked")
yield self.return_after_timeout(2)
self.do_something("Starting RT")
self.do_something("Starting app")
# This could be used if the FSM needs to regain control periodically
# for periodic checks or other service
#
# while True:
# self.do_something("Running periodic checks while app runs")
# self.return_after_timeout(2)
# yield
# Catch any last next() call
yield True
def shutdown_state_machine(self):
'''
Shutdown the session
When 'shutdown()' is called, whether from a 'shutdown' button
or a signal, this FSM will be swapped in place of the
'state_machine' FSM. This may happen at any time, and the
below routines should be prepared for cases where the app or
RT were only partially started or not started at all.
- Stop app (co-routines)
- Stop RT
- Stop IOLoop
(For a proper Machinekit session flow, instead of stopping the
IOLoop, the 'state_machine' FSM should be restarted.)
'''
# Stop the session
self.do_something("Stopping app")
self.do_something("Stopping RT")
# Stop the IOLoop
self.stop()
# Catch any last next() calls
yield True
class Coprocess(MealyCoroutine):
'''
A coprocess life-cycle management FSM
This is just a demo, and does not fork any real processes. It is
intended to show how the IOLoop + co-routine control flows work.
This would normally be started and managed from the 'masterd' FSM.
'''
def __init__(self, cmd):
'''
Init coprocess state machine
'''
self.log.info("Initializing coprocess, command: '%s'" % ' '.join(cmd))
super(Coprocess, self).__init__()
self.cmd = cmd
self.received_term_signal = False
def send_signal(self, sig):
'''
Handle signals (pretend all are terminating signals for the demo)
'''
# FIXME Pretend all signals are SIGTERM
self.received_term_signal = True
# Calling shutdown() will resume the state machine
self.shutdown("Caught signal %d" % sig)
def fork(self):
'''
Fork the coprocess
'''
self.log.info("Forking coprocess")
def poll(self):
'''
Poll whether the coprocess is still running
For this demo, pretend it's always running
'''
# Pretend the thing never exits by itself
self.log.debug(" poll(): coprocess still running")
return True
def term(self):
'''
Send SIGTERM to coprocess
'''
self.log.debug(" sending SIGTERM to coprocess")
def kill(self):
'''
Send SIGKILL to coprocess
'''
self.log.debug(" sending SIGKILL to coprocess")
def state_machine(self):
'''
The coprocess management state machine
- Fork a coprocess (pretend)
- While coprocess runs, schedule return of control every two
seconds to poll for coprocess exit (this would be better done by
catching SIGCHLD events and polling coprocess); when the
coprocess exits or a shutdown request or terminating signal is
received, break the loop
- If the coprocess is still running, send SIGTERM and set 5-second
timeout
- Run a loop, stopping either after the 5-second timeout or the
coprocess exits (SIGCHLD would return control)
- If the coprocess is still running, send SIGKILL
- Stop the IOLoop (for demo purposes; this would normally be
handled by the 'masterd' FSM)
'''
# Fork coprocess
self.fork()
# Poll process and signals
while self.poll() and \
not self.received_term_signal and \
not self.shutdown_requested:
self.log.info(" Do some periodic coprocess maintenance task")
yield self.return_after_timeout(2)
# If coprocess is still running, we got a kill signal or
# shutdown request; try to kill gracefully: send SIGTERM and
# wait up to five seconds before killing forcefully
if self.poll():
self.log.debug("Sending SIGTERM and waiting 5 seconds...")
self.term()
self.return_after_timeout(5)
# Wait for either termination grace period timeout or
# coprocess to exit
while not self.timeout_expired and self.poll():
self.log.debug("Waiting for grace period timeout or coprocess exit")
yield
if self.poll():
self.log.info(
"Coprocess failed to terminate within grace period: Killing")
self.kill()
else:
self.log.error("BUG: Timeout expired but coprocess ended; "
"check signal handler flow")
# Stop loop
self.stop()
# Yield to final next()
yield True
import sys
log_level = logging.DEBUG
logging.basicConfig(level=log_level, stream=sys.stdout)
loop = ioloop.IOLoop.current()
def masterd_session():
# This is an over-simplified masterd 'session' FSM to demonstrate
# basic control flow elements
m = MasterD()
# Pretend someone picked a config file after 3 seconds
def set_config():
m.set_config('foo.ini')
m.loop.add_timeout(
datetime.timedelta(seconds=3),
set_config)
# Pretend someone hit the 'shutdown' button after 6 seconds
def shutdown():
m.shutdown("Main program shutdown request")
m.loop.add_timeout(
datetime.timedelta(seconds=6),
shutdown)
# Run the ioloop
m.loop.start()
def run_coprocess():
# This doesn't actually run anything, but just demonstrates the
# basic control flow of a simplified coprocess FSM
m = Coprocess(['sleep', '60'])
# Set up pretend events:
#
# Pretend a SIGTERM is raised after 3 seconds
def send_sig():
m.send_signal(15)
m.loop.add_timeout(
datetime.timedelta(seconds=3),
send_sig)
# Pretend a second SIGTERM after 5 seconds; should be ignored
m.loop.add_timeout(
datetime.timedelta(seconds=5),
send_sig)
# Pretend shutdown request after 6 seconds; should be ignored
def shutdown():
m.shutdown("Main program shutdown request")
m.loop.add_timeout(
datetime.timedelta(seconds=6),
shutdown)
# Run the ioloop
m.loop.start()
masterd_session()
# run_coprocess()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment