Skip to content

Instantly share code, notes, and snippets.

@aleneum
Last active April 10, 2019 15:03
Show Gist options
  • Save aleneum/8d49358d53bddd68cdeaa5372fcb2f28 to your computer and use it in GitHub Desktop.
Save aleneum/8d49358d53bddd68cdeaa5372fcb2f28 to your computer and use it in GitHub Desktop.
this gist was done to evaluate how django and transitions could work together and is based on work from jxskiss. See his [gist](https://gist.github.com/jxskiss/01816eec9a2b64bae341f4d07f58646e) for more details.
# add the other files to your django app and migrate
# enter django shell with python manage.py shell and execute the following
# I called my app 'issues'; so change issues to your app name in case you choose another one
# there are 5 models which realize state machine bindings to django models in different ways
# Item: add a transitions.Machine instance to each record/item
# ItemSingleton: add each model to a global transitions.Machine
# ItemSingletonSet: Extend transitions.Machine to use set instead of list to store models; this increases look up speed in add_model
# ItemNoMachine: just a minimal model without a state machine (for referencing purposes)
# ItemFysom: add a customized fysom machine to each record/item
from issues.tools import *
# test basic functionality of the three transitions-based models
# returns silent in case no assert fails
test_items()
# test the memory usage
# parameters:
# cls (required) -- either Item, ItemSingleton, ItemSingletonSet, ItemNoMachine, ItemFysom
# limit (default 250) -- set the process memory limit MB before it stops
# step_size (default 10) -- define how many items will be added at once
# returns:
# a list of tuples containing (number of created items, memory usage, time to add 'step_size' objects)
x = test_memory_usage(cls=Item)
# in case you want to store the result using pandas
import pandas as pd
df = pd.Dataframe.from_csv(x)
df.to_csv('./item.csv')
# see the plots at https://github.com/pytransitions/transitions/issues/146
# -*- coding:utf-8 -*-
# File copied from: https://gist.github.com/jxskiss/01816eec9a2b64bae341f4d07f58646e
class StateMachineError(Exception):
pass
class TransitionInvalidError(StateMachineError):
pass
class TransitionCanceledError(StateMachineError):
pass
class StateMachine(object):
"""
Conditions and callbacks execution order:
conditions
on_before_<event>
on_exit_<state>
<<STATE CHANGE>>
on_change_state
on_enter_<state>
on_after_<event>
When dest state is same with source state:
conditions
on_before_<event>
on_reenter_<state>
on_after_<event>
"""
WILDCARD = '*'
SAME_DEST = '='
def __init__(self, state_field, states, transitions):
self.state_field = state_field
# events registry =>
# {
# event_name: {
# "source": set(),
# "dest": dest,
# "conditions": [
# {"true/false": "condition_method"},
# {"true/false": "condition_method", "dest": conditional_dest},
# ],
# },
# }
self._map = {}
self.events = []
self.states = states
cfg = {
'transitions': transitions,
}
self._build_machine(cfg)
def is_state(self, obj, state):
return getattr(obj, self.state_field) == state
def can(self, obj, event):
return (
event in self._map and
((getattr(obj, self.state_field) in self._map[event]['source']) or
self.WILDCARD in self._map[event]['source'])
)
def cannot(self, obj, event):
return not self.can(obj, event)
def _build_machine(self, cfg):
transitions = cfg.get('transitions', [])
def _add(trans):
event = trans['trigger']
if event in self._map:
raise StateMachineError(
'Improperly configured transitions, event %s '
'already registered' % event)
src = trans['source']
if src == self.WILDCARD:
src = [self.WILDCARD]
elif self._is_base_string(src):
src = [src]
ev = {'source': set(src), 'dest': trans['dest']}
conditions = trans.get('conditions')
if conditions:
ev['conditions'] = conditions
self._map[event] = ev
# Construct all transition handlers
for trans in transitions:
if isinstance(trans, list):
trans = {'trigger': trans[0],
'source': trans[1], 'dest': trans[2]}
_add(trans)
event = trans['trigger']
self.events.append(event)
setattr(self, event, self._build_event(event))
def _build_event(self, event):
"""
For every event in the state machine, prepares the event handler.
"""
def fn(obj, *args, **kwargs):
current_state = getattr(obj, self.state_field)
# Check if this event can be triggered in the current state.
if not self.can(obj, event):
raise TransitionInvalidError(
'event %s inappropriate in current state %s'
% (event, current_state))
# On event occurrence, source will always be the current state.
src = current_state
# dest may change during checking conditions
dst = self._map[event]['dest']
if dst == self.SAME_DEST:
dst = src
# Check transition conditions first.
for c in self._map[event].get('conditions', ()):
target = 'true' if 'true' in c else 'false'
_c_r = self._check_condition(obj, target, c[target])
if not _c_r:
if 'else' in c:
dst = c['else']
break
else:
raise TransitionCanceledError(
'Cannot trigger event {0} because the {1} '
'condition returns False'.format(
event, c[target])
)
# Prepares the object with all the meta data to be passed to
# callbacks.
e = self._event_obj()
e.fsm, e.obj, e.event, e.src, e.dst = self, obj, event, src, dst
e.args, e.kwargs = args, kwargs
# used to share object saving status between callbacks
e.obj_has_saved = False
# Try to trigger the before event, unless it gets canceled.
if self._before_event(obj, e) is False:
raise TransitionCanceledError(
'Cannot trigger event {0} because the on_before_{0} '
'handler returns False'.format(event)
)
# Wraps the activities that must constitute a single successful
# transaction.
if src != dst:
def _trans():
delattr(obj, '_sm_transition')
setattr(obj, self.state_field, dst)
self._change_state(obj, e)
self._enter_state(obj, e)
self._after_event(obj, e)
obj._sm_transition = _trans
# Hook to perform asynchronous transition
if self._exit_state(obj, e) is not False:
obj._sm_transition()
else:
self._reenter_state(obj, e)
self._after_event(obj, e)
fn.__name__ = str(event)
fn.__doc__ = (
"Event handler for an {event} event. This event can be "
"fired if the machine is in {states} states.".format(
event=event, states=self._map[event].keys()))
return fn
@staticmethod
def _check_condition(obj, target, func):
if not hasattr(obj, func):
raise AttributeError("obj %s hasn't method %s" % (obj, func))
r = getattr(obj, func)()
if target == 'true':
return r is True
else: # false
return r is False
@staticmethod
def _before_event(obj, ev):
fn_name = 'on_before_' + ev.event
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _after_event(obj, ev):
fn_name = 'on_after_' + ev.event
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _enter_state(obj, ev):
fn_name = 'on_enter_' + ev.dst
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _exit_state(obj, ev):
fn_name = 'on_exit_' + ev.src
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _reenter_state(obj, ev):
fn_name = 'on_reenter_' + ev.dst
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _change_state(obj, ev):
fn_name = 'on_change_state'
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
def trigger(self, obj, event, *args, **kwargs):
"""
Triggers the given event.
The event can be triggered by calling the event handler directly,
for ex: fsm.eat(), but this method will come in handy if the event is
determined dynamically and you have the event name to trigger as
a string.
"""
if not hasattr(self, event):
raise StateMachineError(
"There isn't any event registered as %s" % event)
return getattr(self, event)(obj, event, *args, **kwargs)
@staticmethod
def _is_base_string(obj):
"""
Returns if the object is an instance of basestring.
"""
return isinstance(obj, str)
class _event_obj(object):
"""
Event object.
Attributes:
fsm, obj, event, src, dst, args, kwargs, obj_has_saved
"""
pass
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models
from transitions import Machine
from fysom import StateMachine
from functools import partial
# nothing changed here
class ItemStatus(object):
NEW = 'new'
NEED_INFO = 'need_info'
REVIEWING = 'reviewing'
REDOING = 'redoing'
CONFLICT = 'conflict'
VERIFIED = 'verified'
DELETED = 'deleted'
SM_STATES = [
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED, DELETED
]
SM_TRANSITIONS = [
# trigger, source, destination
['sm_prepare_new', NEW, NEED_INFO],
{
'trigger': 'sm_commit_review',
'source': NEED_INFO,
'dest': REVIEWING,
'conditions': ['check_review_ready'],
},
{
'trigger': 'sm_done_verified',
'source': [REVIEWING, REDOING],
'dest': VERIFIED,
'conditions': ['check_required_fields', 'check_barcodes_valid', 'check_no_conflict'],
},
['sm_mark_conflict', [REVIEWING, REDOING], CONFLICT],
['sm_revert_verified', [VERIFIED, CONFLICT], REDOING],
['sm_require_info', [REVIEWING, REDOING], NEED_INFO],
{
'trigger': 'sm_mark_deleted',
'source': [
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED
],
'dest': DELETED
},
['sm_revert_deleted', DELETED, REDOING],
{
'trigger': 'sm_update',
'source': [NEW, NEED_INFO, REVIEWING, REDOING, VERIFIED],
'dest': '=',
}
]
class ItemMachineMixin(object):
def __init__(self, *args, **kwargs):
super(ItemMachineMixin, self).__init__(*args, **kwargs)
self.machine = Machine(
model=self,
states=ItemStatus.SM_STATES,
initial=self.state,
transitions=ItemStatus.SM_TRANSITIONS,
auto_transitions=False,
send_event=True,
)
self.save()
# instead of adding a machine to each entry, we use a global machine here
# the constructor is only called ONCE and ignored after the global
# instance has been created.
# Note that we do not add a model or set an initial state;
# this will be done for each model entry instead
class ItemSingletonMachineMixin(object):
def __init__(self, *args, **kwargs):
super(ItemSingletonMachineMixin, self).__init__(*args, **kwargs)
machine = GlobalMachine(
model=None,
states=ItemStatus.SM_STATES,
transitions=ItemStatus.SM_TRANSITIONS,
initial=None,
auto_transitions=False,
send_event=True
)
self.save()
machine.add_model(self, initial=self.state)
# Basically the same as ItemSingletonMachineMixin but we
# use another machine classe here.
# GlobalSetMachine stores its models in a set instead of a list which
# makes adding new models to the machine much faster.
class ItemSingletonSetMachineMixin(object):
def __init__(self, *args, **kwargs):
super(ItemSingletonSetMachineMixin, self).__init__(*args, **kwargs)
machine = GlobalSetMachine(
model=None,
states=ItemStatus.SM_STATES,
transitions=ItemStatus.SM_TRANSITIONS,
initial=None,
auto_transitions=False,
send_event=True
)
self.save()
machine.add_model(self, initial=self.state)
class ItemFysomMachineMixin(object):
def __init__(self, *args, **kwargs):
super(ItemFysomMachineMixin, self).__init__(*args, **kwargs)
self.sm = StateMachine(
state_field='state',
states=ItemStatus.SM_STATES,
transitions=ItemStatus.SM_TRANSITIONS,
)
self.save()
class ItemFysomClassMachineMixin(object):
sm = StateMachine(
state_field='state',
states=ItemStatus.SM_STATES,
transitions=ItemStatus.SM_TRANSITIONS,
)
def __init__(self, *args, **kwargs):
super(ItemFysomClassMachineMixin, self).__init__(*args, **kwargs)
def __getattribute__(self, item):
try:
return super(ItemFysomClassMachineMixin, self).__getattribute__(item)
except AttributeError:
# proxy transition calling to state machine
if item.startswith('sm_'):
return partial(getattr(self.sm, item), self)
raise
class Item(ItemMachineMixin, models.Model):
state = models.CharField(max_length=16, default='new')
# Uses ItemSingletonMachineMixin instead of ItemMachineMixin
class ItemSingleton(ItemSingletonMachineMixin, models.Model):
state = models.CharField(max_length=16, default='new')
# Uses ItemSingletonSetMachineMixin instead of ItemMachineMixin
class ItemSingletonSet(ItemSingletonSetMachineMixin, models.Model):
state = models.CharField(max_length=16, default='new')
class ItemNoMachine(models.Model):
state = models.CharField(max_length=16, default='new')
class ItemFysom(ItemFysomMachineMixin, models.Model):
state = models.CharField(max_length=16, default='new')
class ItemFysomClass(ItemFysomClassMachineMixin, models.Model):
state = models.CharField(max_length=16, default='new')
# The singleton meta class which makes sure that always the same
# instance of machine is returned
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# Global machine uses the Singleton metaclass which means there will
# only be one instance of this machine ever
class GlobalMachine(Machine):
__metaclass__ = Singleton
# Same as GlobalMachine but uses a set to store models which makes adding
# new models much faster. Reason: add_models always checks if the model
# has been added before and prevents adding a model accidently multiple times
# this look up takes MUCH longer in case the model list becomes huge
class GlobalSetMachine(Machine):
__metaclass__ = Singleton
def __init__(self, *args, **kwargs):
super(GlobalSetMachine, self).__init__(*args, **kwargs)
self.models = SetWrapper()
# Since transitions expects a list, extend set by a method 'append'
class SetWrapper(set):
def append(self, item):
self.add(item)
import psutil
import os
from models import Item, ItemSingleton, ItemSingletonSet
from models import ItemNoMachine, ItemFysom, ItemFysomClass
from transitions import MachineError
import timeit
def memory_usage():
# return the memory usage in MB
process = psutil.Process(os.getpid())
mem = process.memory_info().rss / float(2 ** 20)
return mem
def create_items(cls, amount=10):
items = []
for i in range(amount):
items.append(cls())
return items
def test_memory_usage(cls, limit=250, step_size=10):
res = []
lists = []
mem = memory_usage()
total = 0
while mem < limit:
start_time = timeit.default_timer()
lists.append(create_items(cls, step_size))
total += step_size
mem = memory_usage()
elapsed = timeit.default_timer() - start_time
print("Currently holding {0} objects"
"with {1} MB memory usage.".format(total, mem))
res.append((total, mem, elapsed))
return res
def test_items():
items1 = create_items(Item)
items2 = create_items(ItemSingleton)
items3 = create_items(ItemSingletonSet)
i1 = items1[0]
i2 = items2[0]
i3 = items3[0]
i4 = items4[0]
# item should all have the same initial state
assert i1.state == 'new'
assert i1.state == i2.state
assert i2.state == i3.state
# every model should contain the transition
i1.sm_prepare_new()
i2.sm_prepare_new()
i3.sm_prepare_new()
# and it should also end in new model states
assert i1.state != 'new'
assert i1.state == i2.state
assert i2.state == i3.state
# this transition is invalid and should cause a MachineError
# for all kinds of Item
try:
i1.sm_done_verified()
assert False
except MachineError:
pass
try:
i2.sm_done_verified()
assert False
except MachineError:
pass
try:
i3.sm_done_verified()
assert False
except MachineError:
pass
# again another working transition
i1.sm_mark_deleted()
i2.sm_mark_deleted()
i3.sm_mark_deleted()
assert i1.state == i2.state
assert i2.state == i3.state
# there should not be side effects with other items from the same kind
assert items1[0].state != items1[1].state
assert items2[0].state != items2[1].state
assert items3[0].state != items3[1].state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment