Last active
December 17, 2015 06:18
-
-
Save klenwell/5563852 to your computer and use it in GitHub Desktop.
Stateful Model (2 files):
stateful_model.py: a subclass of Google App Engine db.Model class that functions as a state machine.
test_stateful_model.py: unit test for StatefulModel.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Google App Engine Stateful Model | |
Subclass App Engine db.Model class so that it may function as a flexible | |
state machine. | |
REFERENCES | |
http://blog.notdot.net/2010/04/Pre--and-post--put-hooks-for-Datastore-models | |
""" | |
import logging | |
from google.appengine.ext import db | |
class StateError(Exception): pass | |
class StatefulModel(db.Model): | |
# default model properties | |
state = db.StringProperty(multiline=False) | |
created = db.DateTimeProperty(auto_now_add=True) | |
updated = db.DateTimeProperty(auto_now=True) | |
# define states and actions | |
states = { | |
'sample' : { | |
'enter': lambda record: logging.debug('entering sample state'), | |
'exit': lambda record: logging.debug('exiting sample state'), | |
'persist': lambda record: record | |
} | |
} | |
def __call__(self, action): | |
"""calls current state action""" | |
if action in self.states[self.state]: | |
return self.states[self.state][action](self) | |
else: | |
raise StateError("invalid action '%s' request for state: %s", ( | |
action, self.state)) | |
def change_state(self, new_state, put_to_datastore=True): | |
"""record is saved by default whenever state changed""" | |
old_state = self.state | |
if old_state in self.states and 'exit' in self.states[old_state]: | |
self.states[old_state]['exit'](self) | |
else: | |
pass | |
if new_state in self.states and 'enter' in self.states[new_state]: | |
self.states[new_state]['enter'](self) | |
elif new_state in self.states: | |
pass | |
else: | |
raise StateError('state %s not set in states dict' % (new_state)) | |
self.state = new_state | |
logging.debug('change state: %s to %s' % (old_state, new_state)) | |
if put_to_datastore: | |
self.put() | |
return self | |
@classmethod | |
def new(ModelClass, state): | |
"""Factory method to instantiate new class. As a rule, this should | |
be overridden to accommodate additional arguments.""" | |
record = ModelClass() | |
record.change_state(state, False) | |
record.put() | |
return record | |
@classmethod | |
def find_by_state(ModelClass, state): | |
"""returns an iterable""" | |
gql = "WHERE state = :state" | |
records = ModelClass.gql(gql, state=state).run() | |
return records |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
StatefulModel Test | |
""" | |
# | |
# IMPORTS | |
# | |
# Python Imports | |
import logging | |
import unittest | |
# App Engine Imports | |
from google.appengine.ext import (db) | |
# Project Imports | |
from project.models.stateful_model import (StatefulModel, StateError) | |
# | |
# Dev Objects | |
# | |
class ThreatLevel(StatefulModel): | |
# additional properties | |
output = db.StringProperty(multiline=False) | |
# states and actions | |
states = { | |
'green': { | |
'enter': lambda record: setattr(record, 'output', | |
"everything's cool"), | |
'elevate': lambda record: record.change_state('yellow'), | |
'lower': lambda record: record | |
}, | |
'yellow': { | |
'enter': lambda record: lambda record: setattr(record, 'output', | |
"getting tense"), | |
'elevate': lambda record: record.change_state('red'), | |
'lower': lambda record: record.change_state('green') | |
}, | |
'red': { | |
'enter': lambda record: record.panic(), | |
'exit' : lambda record: lambda record: setattr(record, 'output', | |
"downgrading from red"), | |
'elevate': lambda record: record, | |
'lower': lambda record: record.change_state('yellow') | |
} | |
} | |
# additional methods | |
def panic(self): | |
self.output = 'EVERYBODY PANIC!' | |
def elevate(self): | |
return self('elevate') | |
def lower(self): | |
return self('lower') | |
class PatientCondition(StatefulModel): | |
name = db.StringProperty(multiline=False) | |
states = { | |
'stable': { | |
'downgrade': lambda record: record.change_state('serious'), | |
'upgrade': lambda record: logging.error("can't upgrade from stable") | |
}, | |
'serious': { | |
'downgrade': lambda record: record.change_state('critical'), | |
'upgrade': lambda record: record.change_state('stable'), | |
}, | |
'critical': { | |
'downgrade': lambda record: logging.error( | |
"can't upgrade from critical"), | |
'upgrade': lambda record: record.change_state('serious'), | |
}, | |
} | |
def downgrade(self): | |
return self('downgrade') | |
def upgrade(self): | |
return self('upgrade') | |
@classmethod | |
def new(ModelClass, name, condition): | |
"""Factory method to instantiate new class.""" | |
record = ModelClass(name=name) | |
record.change_state(condition) # will be autosaved | |
return record | |
# | |
# Test Class | |
# | |
class StatefulModelDevTest(unittest.TestCase): | |
# | |
# Harness | |
# | |
def setUp(self): | |
pass | |
def tearDown(self): | |
pass | |
# | |
# Test | |
# | |
def testStateError(self): | |
patient = PatientCondition.new('Martin', 'stable') | |
self.assertRaises(StateError, patient.change_state, 'fantastic') | |
self.assertRaises(StateError, patient, 'deceased') | |
def testPatientConditionModel(self): | |
patient = PatientCondition.new('John', 'serious') | |
# downgrade (will be autosaved) | |
patient.downgrade() | |
patient_record = list(PatientCondition.find_by_state('critical'))[0] | |
self.assertEqual(patient_record.state, 'critical') | |
self.assertEqual(patient_record.name, 'John') | |
# change state (without autosaving) | |
autosave = False | |
patient.change_state('stable', autosave) | |
stable_count = len(list(PatientCondition.find_by_state('stable'))) | |
critical_count = PatientCondition.gql("WHERE state = :state", | |
state='critical').count() | |
self.assertEqual(patient.state, 'stable') | |
self.assertEqual(stable_count, 0) | |
self.assertEqual(critical_count, 1) | |
def testThreatLevelModel(self): | |
threat = ThreatLevel.new('yellow') | |
threat('elevate') | |
self.assertEqual(threat.state, 'red') | |
saved_threat = list(ThreatLevel.find_by_state('red'))[0] | |
self.assertEqual(saved_threat.state, 'red') | |
self.assertEqual(saved_threat.output, 'EVERYBODY PANIC!') | |
saved_threat.lower() | |
self.assertEqual(saved_threat.state, 'yellow') | |
# | |
# Smoke Tests | |
# | |
def testInstance(self): | |
self.assertTrue(isinstance(self, unittest.TestCase)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment