Skip to content

Instantly share code, notes, and snippets.

@netskink
Last active May 7, 2020 00:08
Show Gist options
  • Save netskink/36f77518d5abe8679c9072c117db606c to your computer and use it in GitHub Desktop.
Save netskink/36f77518d5abe8679c9072c117db606c to your computer and use it in GitHub Desktop.
state machine demo
""" the actions for the lora mqtt state machine
"""
class ns_LoraMqttAction():
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
# action is string.
def __init__(self, action):
self.action = action
# This is supposedly for displaying an object. It did not work
# for me though
def __str__(self):
return self.action
#a_string = '{}\r\n'.format(self.action)
#return a_string
# This is used to print an object. This did not work either
def __repr__(self):
a_string = '{}\r\n'.format(self.action)
return a_string
#return self.action
# Need this so that the state action comparisons will work.
def __eq__(self, other):
#print("comparing self **%s**" % (self.action))
#print("and other **%s**" % (other.action))
return self.action == other.action
# Necessary when __cmp__ or __eq__ is defined
# in order to make this class usable as a
# dictionary key:
def __hash__(self):
print("hash called")
return hash(self.action)
ns_LoraMqttAction.mqttConfigReceived = ns_LoraMqttAction("MQTT config received")
ns_LoraMqttAction.switchPressed = ns_LoraMqttAction("switch pressed")
ns_LoraMqttAction.loraAckReceived = ns_LoraMqttAction("LoRa ack received")
ns_LoraMqttAction.mqttAckReceived = ns_LoraMqttAction("MQTT ack received")
ns_LoraMqttAction.mqttConfigTimeout = ns_LoraMqttAction("MQTT config timeout")
ns_LoraMqttAction.mqttAckTimeout = ns_LoraMqttAction("MQTT ack timeout")
ns_LoraMqttAction.loraAckTimeout = ns_LoraMqttAction("LoRa ack timeout")
# ns_LoraMqttDemo.py
"""Demos priority queue and state machine
"""
# This is example usage of the ns_State and ns_StateMachine
# template class.
#import sys
from ns_State import ns_State
from ns_StateMachine import ns_StateMachine
from ns_LoraMqttAction import ns_LoraMqttAction
#import ns_stop
print("** ns_LoraMqttDemo has started **")
###
#
# The States
#
#################
#
# A different subclass for each state:
# Each of these classes derive from ns_State
#
# For the runAll() routine in the statemachine, it calls next() first
# and then it calls run(). This ensures that the transition to the next
# state is performed and then the action for the new state occurs.
################
class WaitingForMqttConfig(ns_State):
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
def run(self):
"""executes a state
:param self: the heap containing the node to max heap
:type name: ns_Heap.
:param i: index of a node in the binary tree where max heap occurs
:type state: int.
:returns: nothing -- nothing
:raises: heap_index_min, heap_index_max
"""
print("STATE: WaitingForMqttConfig: Waiting for MQTT config message:")
return ns_LoraMqtt.waitingForMqttConfig
def next(self, input_action):
#print("***%s****" % (ns_LoraMqttAction.mqttConfigReceived))
if input_action == ns_LoraMqttAction.mqttConfigReceived:
return ns_LoraMqtt.waitingForSwitchPress
# This might not be needed. TODO: Determine how to
# request a config message.
if input_action == ns_LoraMqttAction.mqttConfigTimeout:
return ns_LoraMqtt.waitingForMqttConfig
return ns_LoraMqtt.waitingForMqttConfig
class WaitingForSwitchPress(ns_State):
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
def run(self):
print("STATE: WaitingForSwitchPress: Waiting for switch press: blink led?")
return ns_LoraMqtt.waitingForSwitchPress
def next(self, input_action):
if input_action == ns_LoraMqttAction.switchPressed:
return ns_LoraMqtt.waitingForLoraAck
return ns_LoraMqtt.waitingForSwitchPress
class WaitingForLoraAck(ns_State):
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
def run(self):
print("STATE: WaitingForLoraAck: Waiting for LoRa Ack: Send LoRa message")
return ns_LoraMqtt.waitingForLoraAck
def next(self, input_action):
if input_action == ns_LoraMqttAction.loraAckReceived:
return ns_LoraMqtt.waitingForMqttAck
if input_action == ns_LoraMqttAction.loraAckTimeout:
return ns_LoraMqtt.waitingForLoraAck
return ns_LoraMqtt.waitingForLoraAck
class WaitingForMqttAck(ns_State):
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
def run(self):
print("STATE: WaitingForMqttAck: Waiting for MQTT Ack: Send MQTT message")
return ns_LoraMqtt.waitingForMqttAck
def next(self, input_action):
if input_action == ns_LoraMqttAction.mqttAckReceived:
return ns_LoraMqtt.waitingForSwitchPress
if input_action == ns_LoraMqttAction.mqttAckTimeout:
return ns_LoraMqtt.waitingForMqttAck
return ns_LoraMqtt.waitingForMqttAck
###
#
# The State Machine
#
#################
# A different subclass for each state:
################
#
# ns_LoraMqtt inherits from ns_StateMachine
#
class ns_LoraMqtt(ns_StateMachine):
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
def __init__(self):
# Initial state
ns_StateMachine.__init__(self, ns_LoraMqtt.waitingForMqttConfig)
#sef restart(self):
# ns_StateMachine.set_state(self, ns_LoraMqtt.waitingForMqttConfig)
# Static variable initialization:
ns_LoraMqtt.waitingForMqttConfig = WaitingForMqttConfig()
ns_LoraMqtt.waitingForSwitchPress = WaitingForSwitchPress()
ns_LoraMqtt.waitingForLoraAck = WaitingForLoraAck()
ns_LoraMqtt.waitingForMqttAck = WaitingForMqttAck()
# a max heap priority queue implementation for use with micropython
"""Code for a simple max heap. This algorithm came from the CERS book.
A max heap is simply each node has the max value at its root and each
node of the subtree maintains the same property.
Here is an example tree of the values 1-5
5
/ \
/ \
4 2
/ \
1 3
"""
#import time
#from ns_LoraMqttAction import ns_LoraMqttAction
class ns_PriorityNode():
""""
Attributes:
node: has a value/priority and object
"""
def __init__(self, priority, a_thing, num_times_run=0):
""" TODO
"""
if priority is None:
return
self.priority = priority
self.a_thing = a_thing
self.num_times_run = num_times_run
def decrease_priority(self, amount=-1):
""" TODO
"""
self.priority = self.priority - amount
# TODO: why does it print a_thing like this:
# a_thing=<ns_LoraMqttAction object at 3f952fd0> ???
def __repr__(self):
""" TODO
"""
a_string = \
'\r\n ns_PriorityNode: \tpriority={}\r\n\t\t\ta_thing={!r}\r\n' \
. format(self.priority, self.a_thing)
return a_string
class ns_PriorityQueue():
"""
Attributes:
heap: List representation of a max heap
Implementation Notes:
The heap is implemented as a list. The algorithm uses list indexes
counting from 1 to n. Where the size of the heap is n. The list
has a null data values at the normal list position of 0. The heap is
sorted so that the root containing the max value is position 1. The numbering
continues so that left is at postion 2 and right is at position 3. Likewise
the left and right of node 2 is 4 and 5. Hence left = 2i and right is 2*i + 1.
This relationship is why list position 0 is not used.
"""
def __init__(self, heap=None):
""" TODO
"""
# the algo book uses indexes which start at 1 instead of 0
# add a dummy value at the first item
self.heap_list = [ns_PriorityNode(-3367, 'xxx')]
if heap is None:
return
# append the list parameter after the dummy value
self.heap_list.extend(heap)
# we will not use len() of the list but instead use size
self.heap_size = len(self.heap_list) - 1
# Go ahead do make sure max heap property is applied
self.build_max_heap()
# Used to print/dump the heap as a list
def __repr__(self):
""" TODO
"""
return 'ns_Heap({!r})'.format(self.heap_list[1:])
def size(self):
""" TODO
"""
return self.heap_size
def parent(self, i):
""" TODO
"""
if i < 1:
assert 0, "heap_index_min"
if i > self.heap_size:
assert 0, "heap_index_max"
return i // 2
def left(self, i):
""" TODO
"""
if i < 1:
assert 0, "heap_index_min"
if i > self.heap_size:
assert 0, "heap_index_max"
return 2 * i
def right(self, i):
""" TODO
"""
if i < 1:
assert 0, "heap_index_min"
if i > self.heap_size:
assert 0, "heap_index_max"
return 2 * i + 1
def max_heapify(self, i):
"""max heaps from index i
:param self: the heap containing the node to max heap
:type name: ns_Heap.
:param i: index of a node in the binary tree where max heap occurs
:type state: int.
:returns: nothing -- nothing
:raises: heap_index_min, heap_index_max
"""
if i < 1:
assert 0, "heap_index_min"
if i > self.heap_size:
assert 0, "heap_index_max"
left_index = self.left(i)
right_index = self.right(i)
if left_index <= self.heap_size and \
self.heap_list[left_index].priority > self.heap_list[i].priority:
largest_index = left_index
else:
largest_index = i
if right_index <= self.heap_size and \
self.heap_list[right_index].priority > self.heap_list[largest_index].priority:
largest_index = right_index
if largest_index != i:
# swap heap_list[i] with heap_list[largest]
temp_value = self.heap_list[i]
self.heap_list[i] = self.heap_list[largest_index]
self.heap_list[largest_index] = temp_value
self.max_heapify(largest_index)
def build_max_heap(self):
""" TODO
"""
for i in range(self.heap_size//2, 0, -1): # can also wrap with reversed()
self.max_heapify(i)
def peek_max(self):
""" Returns the node with max priority
:returns: ns_PriorityQueue_Node
:raises: heap empty
"""
if self.heap_size < 1:
assert 0, "heap_empty"
return self.heap_list[1]
def extract_max(self):
""" TODO
"""
if self.heap_size < 1:
assert 0, "heap_underflow"
the_max = self.heap_list[1]
if self.heap_size == 1:
# pop and toss the last one
self.heap_list.pop()
self.heap_size = 0
else:
self.heap_list[1] = self.heap_list.pop()
self.heap_size = len(self.heap_list) - 1
self.max_heapify(1)
return the_max
def increase_value(self, i, priority):
""" for a given index, modify the key/value to be larger.
Afterwards, reflow the heap with modified key.
Keep in mind, the index is one based.
:param i: the index to increase priority value
:type i: integer.
:param priority: value
:type int: the value of the new priority specified by index i
:returns: nothing -- nothing
:raises: value/key is less than existing value
"""
if priority < self.heap_list[i].priority:
assert 0, "value/key is less than existing value"
self.heap_list[i].priority = priority
while i > 1 and self.heap_list[self.parent(i)].priority < self.heap_list[i].priority:
# swap heap_list[i] with heap_list[parent(i)]
temp_node = self.heap_list[i]
self.heap_list[i] = self.heap_list[self.parent(i)]
self.heap_list[self.parent(i)] = temp_node
i = self.parent(i)
def insert(self, priority_node):
""" TODO
xxx
"""
self.heap_size = self.heap_size + 1
#tic = time.perf_counter()
# method #1 (me)
#self.heap_list.append(value)
#self.build_max_heap()
# method #2 (book) *** This is faster ***
# set the intial value to be -10000 and then use
# increase value routine to adjust priority and reflow heap
self.heap_list.append(ns_PriorityNode(-100000, \
priority_node.a_thing, priority_node.num_times_run))
self.increase_value(self.heap_size, priority_node.priority)
#toc = time.perf_counter()
#print("execution time is %f" % (toc - tic))
# no need if doing method 2 self.build_max_heap()
def walk_tree(self, i=1):
""" Visits each node of the tree.
Starts with root node, then goes left path to leaf
then returns to last non leaf node and does right path.
After this node it will start again on left path. Basically
it visits each node in sorted order if the heap was sorted.
:param i: the index to start node walk. Defaults to first, the root node.
:type i: integer.
:returns: nothing -- nothing
:raises: none
"""
if i < 1:
return
if i > self.heap_size:
return
depth = 0
current = self.parent(i)
while current > 0:
depth = depth + 1
current = self.parent(current)
print("Node (depth:{}) is : {}" . format(depth, self.heap_list[i]))
self.walk_tree(self.left(i))
self.walk_tree(self.right(i))
# NS_State
"""TODO
"""
#
# Implements a state class as part of a state mfachine
#
# pulled from
# https://python-3-patterns-idioms-test.readthedocs.io/en/latest/StateMachine.html
#
class ns_State:
"""
Attributes:
TODO: TODO
Implementation Notes:
TODO
"""
# by default run and next raise exceptions. A user must
# define these entry points.
# The state classes which inherit from this class will
# override these functions.
#
# Each time the state machine gets an input, It will run the
# overriden code implemented here. So this ouptut is performed
# each time.
def run(self):
"""executes a state
:param self: the heap containing the node to max heap
:type name: ns_Heap.
:param i: index of a node in the binary tree where max heap occurs
:type state: int.
:returns: nothing -- nothing
:raises: heap_index_min, heap_index_max
"""
assert 0, "run not implemented"
# After the code above is run in the derived class, this overriden
# function in the derived class is called and based upon the input match
# it will determine what the next state is.
def next(self, input_action):
"""executes a state
:param self: the heap containing the node to max heap
:type name: ns_Heap.
:param i: index of a node in the binary tree where max heap occurs
:type state: int.
:returns: nothing -- nothing
:raises: heap_index_min, heap_index_max
"""
assert 0, "next not implemented"
# NS_StateMachine.py
# Takes a list of Inputs to move from State to State using a template method
#
# Taken from
#
# https://python-3-patterns-idioms-test.readthedocs.io/en/latest/StateMachine.html
class ns_StateMachine:
""" ns_StateMachine
"""
def __init__(self, initialState):
""" init
"""
self.currentState = initialState
self.currentState.run()
#def set_state(self, state):
# self.currentState = state
# Template method
#
# run all, takes a set of input actions and exercises
# the state machine for each of the inputs.
#
# This routine is not overriden and when the derived class calls it,
# it uses this code to transition to the next state and then run
# the code correpsonding to "run" for the new state.
def runAll(self, inputs):
""" runAll
"""
for i in inputs:
# print the input
# print(i)
print("Input Event is {}".format(i))
self.currentState = self.currentState.next(i)
self.currentState.run()
def run(self, input):
""" run
"""
# print the input
print("Input Event is {}".format(input))
self.currentState = self.currentState.next(input)
# self.currentState is something like WaitingForSwitchPress
print(type(self.currentState))
return self.currentState.run()
stuff deleted
...
....
...
....
.... stuff deleted
....
pylint3 -v flash/ns_LoraMqttDemo.py flash/lib/ns_State.py flash/ns_LoraMqttAction.py flash/lib/ns_PriorityQueue.py flash/lib/ns_StateMachine.py
************* Module ns_LoraMqttDemo
flash/ns_LoraMqttDemo.py:1:0: C0103: Module name "ns_LoraMqttDemo" doesn't conform to snake_case naming style (invalid-name)
flash/ns_LoraMqttDemo.py:13:0: E0001: Cannot import 'ns_StateMachine' due to syntax error 'inconsistent use of tabs and spaces in indentation (<unknown>, line 14)' (syntax-error)
flash/ns_LoraMqttDemo.py:128:0: C0103: Class name "ns_LoraMqtt" doesn't conform to PascalCase naming style (invalid-name)
flash/ns_LoraMqttDemo.py:128:0: R0903: Too few public methods (0/2) (too-few-public-methods)
************* Module ns_State
flash/lib/ns_State.py:1:0: C0103: Module name "ns_State" doesn't conform to snake_case naming style (invalid-name)
flash/lib/ns_State.py:10:0: C0103: Class name "ns_State" doesn't conform to PascalCase naming style (invalid-name)
flash/lib/ns_State.py:26:4: R0201: Method could be a function (no-self-use)
flash/lib/ns_State.py:44:19: W0613: Unused argument 'input_action' (unused-argument)
flash/lib/ns_State.py:44:4: R0201: Method could be a function (no-self-use)
************* Module ns_LoraMqttAction
flash/ns_LoraMqttAction.py:1:0: C0103: Module name "ns_LoraMqttAction" doesn't conform to snake_case naming style (invalid-name)
flash/ns_LoraMqttAction.py:4:0: C0103: Class name "ns_LoraMqttAction" doesn't conform to PascalCase naming style (invalid-name)
************* Module ns_PriorityQueue
flash/lib/ns_PriorityQueue.py:43:6: W0511: TODO: why does it print a_thing like this: (fixme)
flash/lib/ns_PriorityQueue.py:1:0: C0103: Module name "ns_PriorityQueue" doesn't conform to snake_case naming style (invalid-name)
flash/lib/ns_PriorityQueue.py:22:0: C0103: Class name "ns_PriorityNode" doesn't conform to PascalCase naming style (invalid-name)
flash/lib/ns_PriorityQueue.py:54:0: C0103: Class name "ns_PriorityQueue" doesn't conform to PascalCase naming style (invalid-name)
************* Module ns_StateMachine
flash/lib/ns_StateMachine.py:14:0: E0001: inconsistent use of tabs and spaces in indentation (<unknown>, line 14) (syntax-error)
------------------------------------------------------------------
Your code has been rated at 8.64/10 (previous run: 8.64/10, +0.00)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment