Last active
August 29, 2015 14:23
-
-
Save objarni/aa0407ea24e25dace506 to your computer and use it in GitHub Desktop.
Sketch of pytddmon re-engineered to a FSM architecture
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Queue | |
import logging | |
logging.basicConfig( | |
format='%(funcName)s: %(message)s', | |
level=logging.DEBUG | |
) | |
class MachineRunner(object): | |
def __init__(self): | |
self.machines = [] | |
self.signal_queue = Queue.Queue() | |
self.ticks = 0 | |
def add_machine(self, machine): | |
logging.info("Adding machine %s.", machine.__class__.__name__) | |
self.machines.append(machine) | |
def rtc(self): | |
self.ticks += 1 | |
# Always publish a tick signal. This means machines that | |
# want to do some piecewise work every rtc step can hook | |
# onto this signal and do their work | |
self.publish('tick', self.ticks) | |
# Count number of signals before processing. | |
# This is to avoid 'infinite rtc step' if any machine | |
# puslishes a signal to itself, which would mean the queue | |
# would never end in a while loop. | |
signals_to_process = len(self.signal_queue.queue) | |
logging.debug("Tick %d. %d signal(s) to process.", | |
self.ticks, signals_to_process) | |
for _ in range(signals_to_process): | |
(sig, par) = self.signal_queue.get() | |
logging.debug('Dispatching signal ("%s", %s).', sig, par) | |
for m in self.machines: | |
logging.debug('%s -> %s', | |
sig, m.__class__.__name__) | |
m.state(sig, par) | |
def publish(self, sig, par=None): | |
assert type(sig) == type('') | |
logging.debug('Enqueueing signal ("%s", %s)', | |
sig, repr(par)) | |
self.signal_queue.put((sig, par)) | |
mr = MachineRunner() | |
class TestRunner(object): | |
def __init__(self, publish): | |
self.state = self.dormant | |
self.publish = publish | |
def dormant(self, sig, par): | |
if sig == 'file_change': | |
logging.debug('Got file change signal, transitioning to running.') | |
self.state = self.running | |
self.total = 10 | |
self.green = 0 | |
def running(self, sig, par): | |
if sig == 'tick': | |
self.green += 1 | |
if self.green == self.total: | |
logging.debug('All tests run. Publishing result and dormenting.') | |
self.publish('test_run_finished', (self.green, self.total)) | |
self.state = self.dormant | |
else: | |
logging.debug("Running test %d of %d.", self.green, | |
self.total) | |
class Pytddmon(object): | |
def __init__(self): | |
self.state = self.green | |
def green(self, sig, par): | |
if sig == 'test_run_finished': | |
(green, total) = par | |
if green < total: | |
logging.debug("It's a failed test run; going red.") | |
self.state = self.red | |
else: | |
logging.debug("It's a successful test run; staying green.") | |
def red(self, sig, par): | |
if sig == 'test_run_finished': | |
(green, total) = par | |
if green == total: | |
logging.debug("It's a successful test run; going green.") | |
self.state = self.green | |
else: | |
logging.debug("It's a failed test run; staying red.") | |
print '--------------------' | |
mr.add_machine(TestRunner(mr.publish)) | |
mr.add_machine(Pytddmon()) | |
import random | |
mr.publish('file_change') | |
for r in range(11): | |
mr.rtc() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment