Last active
October 2, 2016 12:58
-
-
Save zultron/7390e5f689e43a17bb2b to your computer and use it in GitHub Desktop.
Mealy FSM implemented with Python coroutines in a ZMQ IOLoop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
''' | |
Copyright (c) 2014 John Morris <[email protected]> | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
THE SOFTWARE. | |
mealy-coroutine.py | |
Python 'co-routines' [1] are a neat way to implement FSMs. The basic | |
control mechanisms can be implemented in a ZMQ ioloop[2] context. The | |
mechanisms are: | |
- Relinquishing control to the ioloop via 'yield' | |
- Returning control back to the FSM via passing the co-routine's | |
'next()' method as a callback to: | |
- 'add_timeout()': The FSM may relinquish and regain control after a | |
period | |
- 'add_callback()': External events pass control to FSM for | |
servicing via subclass methods | |
My previous attempts at FSMs in an ioloop involved trying to maintain | |
coherency among a confusing stack of state variables, sorting those | |
with verbose logic to determine the correct next state and its output, | |
and trying to make up meaningful names for the various | |
state-transition callbacks. | |
The below system abstracts much of the control infrastructure into the | |
'MealyCoroutine' base class, and a specific state machine subclass | |
co-routine looks like a regular Python function: simple, intuitive and | |
easy to read, with state and control flow naturally maintained in the | |
co-routine across states where control is relinquished to the ioloop | |
to handle `Poller()` events and other asynchronous, simultaneous FSMs. | |
This is a complete example with the base 'MealyCoroutine' class and | |
two demo subclasses: 'MasterD', a model of an application that starts | |
two subsystems, handles signal and control events while they run, and | |
shuts them down; and 'Coprocess', a model of managing the life-cycle | |
of a coprocess fork, monitor and shutdown stages. | |
These demos are not complete or correct in this form, but they do | |
provide complete examples of control flow for FSMs running as | |
co-routines in a ZMQ or Tornado ioloop. | |
[1]: http://eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines | |
[2]: http://zeromq.github.io/pyzmq/api/generated/zmq.eventloop.ioloop.html | |
''' | |
import datetime, logging | |
from zmq.eventloop import ioloop | |
class MealyCoroutineError(RuntimeError): | |
pass | |
def resume_state_machine(func): | |
''' | |
This decorator runs the method, and then calls | |
'return_next_loop()' to schedule control to be passed back to the | |
FSM on the next loop | |
''' | |
def schedule_and_call(obj, *args, **kwargs): | |
res = func(obj, *args, **kwargs) | |
obj.return_next_loop() | |
return res | |
return schedule_and_call | |
def resume_state_machine_clear_timeout(func): | |
''' | |
This decorator runs the method, and then calls | |
'return_next_loop(clear_timeout=True)' to schedule control to be | |
passed back to the FSM on the next loop, clearing irrelevant | |
timeouts | |
''' | |
def schedule_and_call(obj, *args, **kwargs): | |
res = func(obj, *args, **kwargs) | |
obj.return_next_loop(clear_timeout=True) | |
return res | |
return schedule_and_call | |
class MealyCoroutine(object): | |
''' | |
Base class for a Mealy FSM that coordinates state-changing events | |
from a ZMQ IOLoop | |
''' | |
log = logging.getLogger(__name__) | |
def __init__(self): | |
''' | |
Initialize state machine | |
''' | |
# Track shutdown state | |
self.shutdown_requested = False | |
self.shutdown_begun = False | |
# The IOLoop singleton instance for coordinating control | |
self.loop = ioloop.IOLoop.current() | |
# The opaque timeout object and a flag indicating whether | |
# control was returned from the timeout or an intervening | |
# event | |
self.timeout = None | |
self.timeout_expired = False | |
# Init state machine object and add to loop | |
self.state_machine_obj = self.state_machine() | |
self.loop.add_callback(self.state_machine_next) | |
@resume_state_machine | |
def shutdown(self, msg=None): | |
''' | |
'shutdown' callback for external control | |
Set 'shutdown_requested' flag; detect repeated requests | |
The 'resume_state_machine' decorator schedules control to be | |
returned to the FSM on the next loop | |
''' | |
if msg is None: msg = "External shutdown request" | |
if self.shutdown_requested: | |
self.log.debug("%s: Shutdown already in progress" % msg) | |
else: | |
self.log.info("%s: Starting shutdown sequence" % msg) | |
self.shutdown_requested = True | |
def clear_timeout(self): | |
''' | |
Clear any timeout | |
Timeout events may be preempted by other events; the timeout | |
may be cleared if control is not expected to returned to the | |
FSM by the timeout. | |
The 'resume_state_machine_clear_timeout' decorator can be used | |
to clear the timeout before passing control back to the FSM | |
upon surch an event. | |
''' | |
if self.timeout is not None: | |
self.loop.remove_timeout(self.timeout) | |
self.timeout = None | |
def state_machine_next(self, from_wrapper=False, clear_timeout=False): | |
''' | |
Used as a callback to pass control back to the FSM via the | |
co-routine's 'next()' method | |
If a separate 'shutdown_state_machine' FSM is defined, replace | |
the main 'state_machine' FSM with that if not already done. | |
This method is called from other methods, which may request | |
the timeout be cleared, and which may request to do their own | |
debug logging. | |
''' | |
if not from_wrapper: | |
if clear_timeout: | |
self.log.debug(" control returned; clearing timeout") | |
else: | |
self.log.debug(" control returned") | |
# If shutdown requested and there's a separate shutdown state | |
# machine coroutine, set it up as new state machine | |
if self.shutdown_requested and not self.shutdown_begun and \ | |
hasattr(self, 'shutdown_state_machine'): | |
self.log.debug( | |
" initializing shutdown state machine") | |
self.state_machine_obj = self.shutdown_state_machine() | |
self.shutdown_begun = True # Don't do this twice | |
# Clear timeout, if requested | |
if clear_timeout: self.clear_timeout() | |
# Return control to state machine | |
return self.state_machine_obj.next() | |
def state_machine_next_timeout(self): | |
''' | |
Used as a callback from a timeout to pass control back to the | |
FSM; sets the 'timeout_expired' flag | |
In some instances, a state machine needs to know whether | |
control was returned from a timeout or from an intervening | |
event, indicated by the 'timeout_expired' flag. | |
''' | |
self.log.debug(" control returned: timeout expired") | |
self.timeout_expired = True | |
return self.state_machine_next(from_wrapper=True) | |
def return_after_timeout(self, seconds): | |
''' | |
Sets a timeout, after which the FSM may 'yield' control | |
The FSM will regain control after the specified number of | |
seconds, upon which 'timeout_expired' will be set. If another | |
intervening event occurs first, 'timeout_expired' will be | |
unset. | |
''' | |
self.log.debug(" setting timeout for %s seconds" % seconds) | |
self.clear_timeout() | |
self.timeout_expired = False | |
self.timeout = self.loop.add_timeout( | |
datetime.timedelta(seconds=seconds), | |
self.state_machine_next_timeout) | |
def return_next_loop(self, clear_timeout=False): | |
''' | |
Schedule return of control to the FSM at the next loop | |
''' | |
self.log.debug(" returning control in next loop") | |
self.loop.add_callback(self.state_machine_next, | |
clear_timeout=clear_timeout) | |
def stop(self): | |
''' | |
Schedule the main IOLoop to be stopped at the next loop | |
''' | |
self.log.info("Stopping loop") | |
self.loop.add_callback(self.loop.stop) | |
def state_machine(self): | |
''' | |
The main state machine method | |
A subclass must override this method to implement its FSM. | |
''' | |
err = "Subclasses must override 'state_machine' method" | |
self.log.error(err) | |
raise MealyCoroutineError(err) | |
class MasterD(MealyCoroutine): | |
''' | |
A FSM subclass demonstrating control flow for a simplified | |
Machinekit master daemon | |
This is not meant to be a complete or accurate 'masterd' state | |
machine model. It is intended only to demonstrate how the IOLoop + | |
co-routine control flows work. | |
''' | |
def __init__(self): | |
''' | |
Initialize the master daemon FSM | |
''' | |
super(MasterD, self).__init__() | |
# The config set by set_config() | |
self.config = None | |
@resume_state_machine_clear_timeout | |
def set_config(self, config): | |
''' | |
A callback for the 'config picker' service to pass the chosen | |
config to the masterd | |
The 'resume_state_machine_clear_timeout' clears any timeouts | |
and passes control to the masterd FSM on the next loop. | |
''' | |
self.log.info("External request, set config to %s" % config) | |
self.config = config | |
def config_choice(self): | |
''' | |
If a configuration has been chosen, set it up and return True, | |
else False | |
''' | |
if self.config is None: | |
self.log.debug(" Waiting for config choice") | |
# Signal to state machine that no config was found | |
return False | |
self.log.info("Setting up chosen config %s" % self.config) | |
return True | |
def do_something(self, desc): | |
''' | |
A place holder method for doing something real | |
''' | |
self.log.info(desc) | |
def state_machine(self): | |
''' | |
The masterd session state machine | |
A simple demo of FSM control in a ZMQ IOLoop: | |
- Wait for a config to be picked; the 'config picker' service | |
will call 'set_config(filename)' | |
- Start up RT | |
- Start up app (linuxcncsrvr, milltask, etc. co-routines) | |
- Optionally schedule periodic service activities while app | |
runs | |
When 'shutdown()' is called, this FSM will be pulled out of | |
the loop and replaced with the 'shutdown_state_machine' FSM. | |
''' | |
self.log.info("Init server") | |
# The following loop is just to demonstrate how the FSM can | |
# regain periodic control for whatever reason; if it were | |
# replaced with a simple 'yield', control will be returned | |
# once 'self.set_config(config)' is called from an external | |
# event | |
while not self.config_choice(): | |
# No config chosen; do something, and then yield to ioloop | |
# for two seconds | |
self.do_something("Periodically do something while waiting for " | |
"config to be picked") | |
yield self.return_after_timeout(2) | |
self.do_something("Starting RT") | |
self.do_something("Starting app") | |
# This could be used if the FSM needs to regain control periodically | |
# for periodic checks or other service | |
# | |
# while True: | |
# self.do_something("Running periodic checks while app runs") | |
# self.return_after_timeout(2) | |
# yield | |
# Catch any last next() call | |
yield True | |
def shutdown_state_machine(self): | |
''' | |
Shutdown the session | |
When 'shutdown()' is called, whether from a 'shutdown' button | |
or a signal, this FSM will be swapped in place of the | |
'state_machine' FSM. This may happen at any time, and the | |
below routines should be prepared for cases where the app or | |
RT were only partially started or not started at all. | |
- Stop app (co-routines) | |
- Stop RT | |
- Stop IOLoop | |
(For a proper Machinekit session flow, instead of stopping the | |
IOLoop, the 'state_machine' FSM should be restarted.) | |
''' | |
# Stop the session | |
self.do_something("Stopping app") | |
self.do_something("Stopping RT") | |
# Stop the IOLoop | |
self.stop() | |
# Catch any last next() calls | |
yield True | |
class Coprocess(MealyCoroutine): | |
''' | |
A coprocess life-cycle management FSM | |
This is just a demo, and does not fork any real processes. It is | |
intended to show how the IOLoop + co-routine control flows work. | |
This would normally be started and managed from the 'masterd' FSM. | |
''' | |
def __init__(self, cmd): | |
''' | |
Init coprocess state machine | |
''' | |
self.log.info("Initializing coprocess, command: '%s'" % ' '.join(cmd)) | |
super(Coprocess, self).__init__() | |
self.cmd = cmd | |
self.received_term_signal = False | |
def send_signal(self, sig): | |
''' | |
Handle signals (pretend all are terminating signals for the demo) | |
''' | |
# FIXME Pretend all signals are SIGTERM | |
self.received_term_signal = True | |
# Calling shutdown() will resume the state machine | |
self.shutdown("Caught signal %d" % sig) | |
def fork(self): | |
''' | |
Fork the coprocess | |
''' | |
self.log.info("Forking coprocess") | |
def poll(self): | |
''' | |
Poll whether the coprocess is still running | |
For this demo, pretend it's always running | |
''' | |
# Pretend the thing never exits by itself | |
self.log.debug(" poll(): coprocess still running") | |
return True | |
def term(self): | |
''' | |
Send SIGTERM to coprocess | |
''' | |
self.log.debug(" sending SIGTERM to coprocess") | |
def kill(self): | |
''' | |
Send SIGKILL to coprocess | |
''' | |
self.log.debug(" sending SIGKILL to coprocess") | |
def state_machine(self): | |
''' | |
The coprocess management state machine | |
- Fork a coprocess (pretend) | |
- While coprocess runs, schedule return of control every two | |
seconds to poll for coprocess exit (this would be better done by | |
catching SIGCHLD events and polling coprocess); when the | |
coprocess exits or a shutdown request or terminating signal is | |
received, break the loop | |
- If the coprocess is still running, send SIGTERM and set 5-second | |
timeout | |
- Run a loop, stopping either after the 5-second timeout or the | |
coprocess exits (SIGCHLD would return control) | |
- If the coprocess is still running, send SIGKILL | |
- Stop the IOLoop (for demo purposes; this would normally be | |
handled by the 'masterd' FSM) | |
''' | |
# Fork coprocess | |
self.fork() | |
# Poll process and signals | |
while self.poll() and \ | |
not self.received_term_signal and \ | |
not self.shutdown_requested: | |
self.log.info(" Do some periodic coprocess maintenance task") | |
yield self.return_after_timeout(2) | |
# If coprocess is still running, we got a kill signal or | |
# shutdown request; try to kill gracefully: send SIGTERM and | |
# wait up to five seconds before killing forcefully | |
if self.poll(): | |
self.log.debug("Sending SIGTERM and waiting 5 seconds...") | |
self.term() | |
self.return_after_timeout(5) | |
# Wait for either termination grace period timeout or | |
# coprocess to exit | |
while not self.timeout_expired and self.poll(): | |
self.log.debug("Waiting for grace period timeout or coprocess exit") | |
yield | |
if self.poll(): | |
self.log.info( | |
"Coprocess failed to terminate within grace period: Killing") | |
self.kill() | |
else: | |
self.log.error("BUG: Timeout expired but coprocess ended; " | |
"check signal handler flow") | |
# Stop loop | |
self.stop() | |
# Yield to final next() | |
yield True | |
import sys | |
log_level = logging.DEBUG | |
logging.basicConfig(level=log_level, stream=sys.stdout) | |
loop = ioloop.IOLoop.current() | |
def masterd_session(): | |
# This is an over-simplified masterd 'session' FSM to demonstrate | |
# basic control flow elements | |
m = MasterD() | |
# Pretend someone picked a config file after 3 seconds | |
def set_config(): | |
m.set_config('foo.ini') | |
m.loop.add_timeout( | |
datetime.timedelta(seconds=3), | |
set_config) | |
# Pretend someone hit the 'shutdown' button after 6 seconds | |
def shutdown(): | |
m.shutdown("Main program shutdown request") | |
m.loop.add_timeout( | |
datetime.timedelta(seconds=6), | |
shutdown) | |
# Run the ioloop | |
m.loop.start() | |
def run_coprocess(): | |
# This doesn't actually run anything, but just demonstrates the | |
# basic control flow of a simplified coprocess FSM | |
m = Coprocess(['sleep', '60']) | |
# Set up pretend events: | |
# | |
# Pretend a SIGTERM is raised after 3 seconds | |
def send_sig(): | |
m.send_signal(15) | |
m.loop.add_timeout( | |
datetime.timedelta(seconds=3), | |
send_sig) | |
# Pretend a second SIGTERM after 5 seconds; should be ignored | |
m.loop.add_timeout( | |
datetime.timedelta(seconds=5), | |
send_sig) | |
# Pretend shutdown request after 6 seconds; should be ignored | |
def shutdown(): | |
m.shutdown("Main program shutdown request") | |
m.loop.add_timeout( | |
datetime.timedelta(seconds=6), | |
shutdown) | |
# Run the ioloop | |
m.loop.start() | |
masterd_session() | |
# run_coprocess() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment