Last active
May 7, 2020 00:08
-
-
Save netskink/36f77518d5abe8679c9072c117db606c to your computer and use it in GitHub Desktop.
state machine demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" the actions for the lora mqtt state machine | |
""" | |
class ns_LoraMqttAction(): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
# action is string. | |
def __init__(self, action): | |
self.action = action | |
# This is supposedly for displaying an object. It did not work | |
# for me though | |
def __str__(self): | |
return self.action | |
#a_string = '{}\r\n'.format(self.action) | |
#return a_string | |
# This is used to print an object. This did not work either | |
def __repr__(self): | |
a_string = '{}\r\n'.format(self.action) | |
return a_string | |
#return self.action | |
# Need this so that the state action comparisons will work. | |
def __eq__(self, other): | |
#print("comparing self **%s**" % (self.action)) | |
#print("and other **%s**" % (other.action)) | |
return self.action == other.action | |
# Necessary when __cmp__ or __eq__ is defined | |
# in order to make this class usable as a | |
# dictionary key: | |
def __hash__(self): | |
print("hash called") | |
return hash(self.action) | |
ns_LoraMqttAction.mqttConfigReceived = ns_LoraMqttAction("MQTT config received") | |
ns_LoraMqttAction.switchPressed = ns_LoraMqttAction("switch pressed") | |
ns_LoraMqttAction.loraAckReceived = ns_LoraMqttAction("LoRa ack received") | |
ns_LoraMqttAction.mqttAckReceived = ns_LoraMqttAction("MQTT ack received") | |
ns_LoraMqttAction.mqttConfigTimeout = ns_LoraMqttAction("MQTT config timeout") | |
ns_LoraMqttAction.mqttAckTimeout = ns_LoraMqttAction("MQTT ack timeout") | |
ns_LoraMqttAction.loraAckTimeout = ns_LoraMqttAction("LoRa ack timeout") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ns_LoraMqttDemo.py | |
"""Demos priority queue and state machine | |
""" | |
# This is example usage of the ns_State and ns_StateMachine | |
# template class. | |
#import sys | |
from ns_State import ns_State | |
from ns_StateMachine import ns_StateMachine | |
from ns_LoraMqttAction import ns_LoraMqttAction | |
#import ns_stop | |
print("** ns_LoraMqttDemo has started **") | |
### | |
# | |
# The States | |
# | |
################# | |
# | |
# A different subclass for each state: | |
# Each of these classes derive from ns_State | |
# | |
# For the runAll() routine in the statemachine, it calls next() first | |
# and then it calls run(). This ensures that the transition to the next | |
# state is performed and then the action for the new state occurs. | |
################ | |
class WaitingForMqttConfig(ns_State): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
def run(self): | |
"""executes a state | |
:param self: the heap containing the node to max heap | |
:type name: ns_Heap. | |
:param i: index of a node in the binary tree where max heap occurs | |
:type state: int. | |
:returns: nothing -- nothing | |
:raises: heap_index_min, heap_index_max | |
""" | |
print("STATE: WaitingForMqttConfig: Waiting for MQTT config message:") | |
return ns_LoraMqtt.waitingForMqttConfig | |
def next(self, input_action): | |
#print("***%s****" % (ns_LoraMqttAction.mqttConfigReceived)) | |
if input_action == ns_LoraMqttAction.mqttConfigReceived: | |
return ns_LoraMqtt.waitingForSwitchPress | |
# This might not be needed. TODO: Determine how to | |
# request a config message. | |
if input_action == ns_LoraMqttAction.mqttConfigTimeout: | |
return ns_LoraMqtt.waitingForMqttConfig | |
return ns_LoraMqtt.waitingForMqttConfig | |
class WaitingForSwitchPress(ns_State): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
def run(self): | |
print("STATE: WaitingForSwitchPress: Waiting for switch press: blink led?") | |
return ns_LoraMqtt.waitingForSwitchPress | |
def next(self, input_action): | |
if input_action == ns_LoraMqttAction.switchPressed: | |
return ns_LoraMqtt.waitingForLoraAck | |
return ns_LoraMqtt.waitingForSwitchPress | |
class WaitingForLoraAck(ns_State): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
def run(self): | |
print("STATE: WaitingForLoraAck: Waiting for LoRa Ack: Send LoRa message") | |
return ns_LoraMqtt.waitingForLoraAck | |
def next(self, input_action): | |
if input_action == ns_LoraMqttAction.loraAckReceived: | |
return ns_LoraMqtt.waitingForMqttAck | |
if input_action == ns_LoraMqttAction.loraAckTimeout: | |
return ns_LoraMqtt.waitingForLoraAck | |
return ns_LoraMqtt.waitingForLoraAck | |
class WaitingForMqttAck(ns_State): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
def run(self): | |
print("STATE: WaitingForMqttAck: Waiting for MQTT Ack: Send MQTT message") | |
return ns_LoraMqtt.waitingForMqttAck | |
def next(self, input_action): | |
if input_action == ns_LoraMqttAction.mqttAckReceived: | |
return ns_LoraMqtt.waitingForSwitchPress | |
if input_action == ns_LoraMqttAction.mqttAckTimeout: | |
return ns_LoraMqtt.waitingForMqttAck | |
return ns_LoraMqtt.waitingForMqttAck | |
### | |
# | |
# The State Machine | |
# | |
################# | |
# A different subclass for each state: | |
################ | |
# | |
# ns_LoraMqtt inherits from ns_StateMachine | |
# | |
class ns_LoraMqtt(ns_StateMachine): | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
def __init__(self): | |
# Initial state | |
ns_StateMachine.__init__(self, ns_LoraMqtt.waitingForMqttConfig) | |
#sef restart(self): | |
# ns_StateMachine.set_state(self, ns_LoraMqtt.waitingForMqttConfig) | |
# Static variable initialization: | |
ns_LoraMqtt.waitingForMqttConfig = WaitingForMqttConfig() | |
ns_LoraMqtt.waitingForSwitchPress = WaitingForSwitchPress() | |
ns_LoraMqtt.waitingForLoraAck = WaitingForLoraAck() | |
ns_LoraMqtt.waitingForMqttAck = WaitingForMqttAck() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# a max heap priority queue implementation for use with micropython | |
"""Code for a simple max heap. This algorithm came from the CERS book. | |
A max heap is simply each node has the max value at its root and each | |
node of the subtree maintains the same property. | |
Here is an example tree of the values 1-5 | |
5 | |
/ \ | |
/ \ | |
4 2 | |
/ \ | |
1 3 | |
""" | |
#import time | |
#from ns_LoraMqttAction import ns_LoraMqttAction | |
class ns_PriorityNode(): | |
"""" | |
Attributes: | |
node: has a value/priority and object | |
""" | |
def __init__(self, priority, a_thing, num_times_run=0): | |
""" TODO | |
""" | |
if priority is None: | |
return | |
self.priority = priority | |
self.a_thing = a_thing | |
self.num_times_run = num_times_run | |
def decrease_priority(self, amount=-1): | |
""" TODO | |
""" | |
self.priority = self.priority - amount | |
# TODO: why does it print a_thing like this: | |
# a_thing=<ns_LoraMqttAction object at 3f952fd0> ??? | |
def __repr__(self): | |
""" TODO | |
""" | |
a_string = \ | |
'\r\n ns_PriorityNode: \tpriority={}\r\n\t\t\ta_thing={!r}\r\n' \ | |
. format(self.priority, self.a_thing) | |
return a_string | |
class ns_PriorityQueue(): | |
""" | |
Attributes: | |
heap: List representation of a max heap | |
Implementation Notes: | |
The heap is implemented as a list. The algorithm uses list indexes | |
counting from 1 to n. Where the size of the heap is n. The list | |
has a null data values at the normal list position of 0. The heap is | |
sorted so that the root containing the max value is position 1. The numbering | |
continues so that left is at postion 2 and right is at position 3. Likewise | |
the left and right of node 2 is 4 and 5. Hence left = 2i and right is 2*i + 1. | |
This relationship is why list position 0 is not used. | |
""" | |
def __init__(self, heap=None): | |
""" TODO | |
""" | |
# the algo book uses indexes which start at 1 instead of 0 | |
# add a dummy value at the first item | |
self.heap_list = [ns_PriorityNode(-3367, 'xxx')] | |
if heap is None: | |
return | |
# append the list parameter after the dummy value | |
self.heap_list.extend(heap) | |
# we will not use len() of the list but instead use size | |
self.heap_size = len(self.heap_list) - 1 | |
# Go ahead do make sure max heap property is applied | |
self.build_max_heap() | |
# Used to print/dump the heap as a list | |
def __repr__(self): | |
""" TODO | |
""" | |
return 'ns_Heap({!r})'.format(self.heap_list[1:]) | |
def size(self): | |
""" TODO | |
""" | |
return self.heap_size | |
def parent(self, i): | |
""" TODO | |
""" | |
if i < 1: | |
assert 0, "heap_index_min" | |
if i > self.heap_size: | |
assert 0, "heap_index_max" | |
return i // 2 | |
def left(self, i): | |
""" TODO | |
""" | |
if i < 1: | |
assert 0, "heap_index_min" | |
if i > self.heap_size: | |
assert 0, "heap_index_max" | |
return 2 * i | |
def right(self, i): | |
""" TODO | |
""" | |
if i < 1: | |
assert 0, "heap_index_min" | |
if i > self.heap_size: | |
assert 0, "heap_index_max" | |
return 2 * i + 1 | |
def max_heapify(self, i): | |
"""max heaps from index i | |
:param self: the heap containing the node to max heap | |
:type name: ns_Heap. | |
:param i: index of a node in the binary tree where max heap occurs | |
:type state: int. | |
:returns: nothing -- nothing | |
:raises: heap_index_min, heap_index_max | |
""" | |
if i < 1: | |
assert 0, "heap_index_min" | |
if i > self.heap_size: | |
assert 0, "heap_index_max" | |
left_index = self.left(i) | |
right_index = self.right(i) | |
if left_index <= self.heap_size and \ | |
self.heap_list[left_index].priority > self.heap_list[i].priority: | |
largest_index = left_index | |
else: | |
largest_index = i | |
if right_index <= self.heap_size and \ | |
self.heap_list[right_index].priority > self.heap_list[largest_index].priority: | |
largest_index = right_index | |
if largest_index != i: | |
# swap heap_list[i] with heap_list[largest] | |
temp_value = self.heap_list[i] | |
self.heap_list[i] = self.heap_list[largest_index] | |
self.heap_list[largest_index] = temp_value | |
self.max_heapify(largest_index) | |
def build_max_heap(self): | |
""" TODO | |
""" | |
for i in range(self.heap_size//2, 0, -1): # can also wrap with reversed() | |
self.max_heapify(i) | |
def peek_max(self): | |
""" Returns the node with max priority | |
:returns: ns_PriorityQueue_Node | |
:raises: heap empty | |
""" | |
if self.heap_size < 1: | |
assert 0, "heap_empty" | |
return self.heap_list[1] | |
def extract_max(self): | |
""" TODO | |
""" | |
if self.heap_size < 1: | |
assert 0, "heap_underflow" | |
the_max = self.heap_list[1] | |
if self.heap_size == 1: | |
# pop and toss the last one | |
self.heap_list.pop() | |
self.heap_size = 0 | |
else: | |
self.heap_list[1] = self.heap_list.pop() | |
self.heap_size = len(self.heap_list) - 1 | |
self.max_heapify(1) | |
return the_max | |
def increase_value(self, i, priority): | |
""" for a given index, modify the key/value to be larger. | |
Afterwards, reflow the heap with modified key. | |
Keep in mind, the index is one based. | |
:param i: the index to increase priority value | |
:type i: integer. | |
:param priority: value | |
:type int: the value of the new priority specified by index i | |
:returns: nothing -- nothing | |
:raises: value/key is less than existing value | |
""" | |
if priority < self.heap_list[i].priority: | |
assert 0, "value/key is less than existing value" | |
self.heap_list[i].priority = priority | |
while i > 1 and self.heap_list[self.parent(i)].priority < self.heap_list[i].priority: | |
# swap heap_list[i] with heap_list[parent(i)] | |
temp_node = self.heap_list[i] | |
self.heap_list[i] = self.heap_list[self.parent(i)] | |
self.heap_list[self.parent(i)] = temp_node | |
i = self.parent(i) | |
def insert(self, priority_node): | |
""" TODO | |
xxx | |
""" | |
self.heap_size = self.heap_size + 1 | |
#tic = time.perf_counter() | |
# method #1 (me) | |
#self.heap_list.append(value) | |
#self.build_max_heap() | |
# method #2 (book) *** This is faster *** | |
# set the intial value to be -10000 and then use | |
# increase value routine to adjust priority and reflow heap | |
self.heap_list.append(ns_PriorityNode(-100000, \ | |
priority_node.a_thing, priority_node.num_times_run)) | |
self.increase_value(self.heap_size, priority_node.priority) | |
#toc = time.perf_counter() | |
#print("execution time is %f" % (toc - tic)) | |
# no need if doing method 2 self.build_max_heap() | |
def walk_tree(self, i=1): | |
""" Visits each node of the tree. | |
Starts with root node, then goes left path to leaf | |
then returns to last non leaf node and does right path. | |
After this node it will start again on left path. Basically | |
it visits each node in sorted order if the heap was sorted. | |
:param i: the index to start node walk. Defaults to first, the root node. | |
:type i: integer. | |
:returns: nothing -- nothing | |
:raises: none | |
""" | |
if i < 1: | |
return | |
if i > self.heap_size: | |
return | |
depth = 0 | |
current = self.parent(i) | |
while current > 0: | |
depth = depth + 1 | |
current = self.parent(current) | |
print("Node (depth:{}) is : {}" . format(depth, self.heap_list[i])) | |
self.walk_tree(self.left(i)) | |
self.walk_tree(self.right(i)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NS_State | |
"""TODO | |
""" | |
# | |
# Implements a state class as part of a state mfachine | |
# | |
# pulled from | |
# https://python-3-patterns-idioms-test.readthedocs.io/en/latest/StateMachine.html | |
# | |
class ns_State: | |
""" | |
Attributes: | |
TODO: TODO | |
Implementation Notes: | |
TODO | |
""" | |
# by default run and next raise exceptions. A user must | |
# define these entry points. | |
# The state classes which inherit from this class will | |
# override these functions. | |
# | |
# Each time the state machine gets an input, It will run the | |
# overriden code implemented here. So this ouptut is performed | |
# each time. | |
def run(self): | |
"""executes a state | |
:param self: the heap containing the node to max heap | |
:type name: ns_Heap. | |
:param i: index of a node in the binary tree where max heap occurs | |
:type state: int. | |
:returns: nothing -- nothing | |
:raises: heap_index_min, heap_index_max | |
""" | |
assert 0, "run not implemented" | |
# After the code above is run in the derived class, this overriden | |
# function in the derived class is called and based upon the input match | |
# it will determine what the next state is. | |
def next(self, input_action): | |
"""executes a state | |
:param self: the heap containing the node to max heap | |
:type name: ns_Heap. | |
:param i: index of a node in the binary tree where max heap occurs | |
:type state: int. | |
:returns: nothing -- nothing | |
:raises: heap_index_min, heap_index_max | |
""" | |
assert 0, "next not implemented" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NS_StateMachine.py | |
# Takes a list of Inputs to move from State to State using a template method | |
# | |
# Taken from | |
# | |
# https://python-3-patterns-idioms-test.readthedocs.io/en/latest/StateMachine.html | |
class ns_StateMachine: | |
""" ns_StateMachine | |
""" | |
def __init__(self, initialState): | |
""" init | |
""" | |
self.currentState = initialState | |
self.currentState.run() | |
#def set_state(self, state): | |
# self.currentState = state | |
# Template method | |
# | |
# run all, takes a set of input actions and exercises | |
# the state machine for each of the inputs. | |
# | |
# This routine is not overriden and when the derived class calls it, | |
# it uses this code to transition to the next state and then run | |
# the code correpsonding to "run" for the new state. | |
def runAll(self, inputs): | |
""" runAll | |
""" | |
for i in inputs: | |
# print the input | |
# print(i) | |
print("Input Event is {}".format(i)) | |
self.currentState = self.currentState.next(i) | |
self.currentState.run() | |
def run(self, input): | |
""" run | |
""" | |
# print the input | |
print("Input Event is {}".format(input)) | |
self.currentState = self.currentState.next(input) | |
# self.currentState is something like WaitingForSwitchPress | |
print(type(self.currentState)) | |
return self.currentState.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stuff deleted | |
... | |
.... | |
... | |
.... | |
.... stuff deleted | |
.... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pylint3 -v flash/ns_LoraMqttDemo.py flash/lib/ns_State.py flash/ns_LoraMqttAction.py flash/lib/ns_PriorityQueue.py flash/lib/ns_StateMachine.py | |
************* Module ns_LoraMqttDemo | |
flash/ns_LoraMqttDemo.py:1:0: C0103: Module name "ns_LoraMqttDemo" doesn't conform to snake_case naming style (invalid-name) | |
flash/ns_LoraMqttDemo.py:13:0: E0001: Cannot import 'ns_StateMachine' due to syntax error 'inconsistent use of tabs and spaces in indentation (<unknown>, line 14)' (syntax-error) | |
flash/ns_LoraMqttDemo.py:128:0: C0103: Class name "ns_LoraMqtt" doesn't conform to PascalCase naming style (invalid-name) | |
flash/ns_LoraMqttDemo.py:128:0: R0903: Too few public methods (0/2) (too-few-public-methods) | |
************* Module ns_State | |
flash/lib/ns_State.py:1:0: C0103: Module name "ns_State" doesn't conform to snake_case naming style (invalid-name) | |
flash/lib/ns_State.py:10:0: C0103: Class name "ns_State" doesn't conform to PascalCase naming style (invalid-name) | |
flash/lib/ns_State.py:26:4: R0201: Method could be a function (no-self-use) | |
flash/lib/ns_State.py:44:19: W0613: Unused argument 'input_action' (unused-argument) | |
flash/lib/ns_State.py:44:4: R0201: Method could be a function (no-self-use) | |
************* Module ns_LoraMqttAction | |
flash/ns_LoraMqttAction.py:1:0: C0103: Module name "ns_LoraMqttAction" doesn't conform to snake_case naming style (invalid-name) | |
flash/ns_LoraMqttAction.py:4:0: C0103: Class name "ns_LoraMqttAction" doesn't conform to PascalCase naming style (invalid-name) | |
************* Module ns_PriorityQueue | |
flash/lib/ns_PriorityQueue.py:43:6: W0511: TODO: why does it print a_thing like this: (fixme) | |
flash/lib/ns_PriorityQueue.py:1:0: C0103: Module name "ns_PriorityQueue" doesn't conform to snake_case naming style (invalid-name) | |
flash/lib/ns_PriorityQueue.py:22:0: C0103: Class name "ns_PriorityNode" doesn't conform to PascalCase naming style (invalid-name) | |
flash/lib/ns_PriorityQueue.py:54:0: C0103: Class name "ns_PriorityQueue" doesn't conform to PascalCase naming style (invalid-name) | |
************* Module ns_StateMachine | |
flash/lib/ns_StateMachine.py:14:0: E0001: inconsistent use of tabs and spaces in indentation (<unknown>, line 14) (syntax-error) | |
------------------------------------------------------------------ | |
Your code has been rated at 8.64/10 (previous run: 8.64/10, +0.00) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment