Last active
January 1, 2024 06:03
-
-
Save ttsiodras/fe32284ac204907249d479b4225eb83c to your computer and use it in GitHub Desktop.
The cleanest way (I've ever seen) to implement state machines in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Based on a great article from Eli Bendersky: | |
# | |
# http://eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines/ | |
# | |
# I just "ported" to Python3 (still using plain old yield) and added the proper type annotations, | |
# since Python 3.5 can now support them (and mypy can check them): | |
# | |
# $ mypy --disallow-untyped-defs protocol_via_coroutines.py | |
# $ | |
# | |
from typing import Callable, Generator, List | |
# A protocol decoder: | |
# | |
# - yields Nothing | |
# - expects ints to be `send` in his yield waits | |
# - and doesn't return anything. | |
ProtocolDecodingCoroutine = Generator[None, int, None] | |
# A frame consumer (passed as an argument to a protocol decoder): | |
# | |
# - yields Nothing | |
# - expects List[int] to be `send` in his waiting yields | |
# - and doesn't return anything. | |
FrameConsumerCoroutine = Generator[None, List[int], None] | |
def unwrap_protocol(header: int=0x61, | |
footer: int=0x62, | |
dle: int=0xAB, | |
after_dle_func: Callable[[int], int]=lambda x: x, | |
target: FrameConsumerCoroutine=None) -> ProtocolDecodingCoroutine: | |
""" | |
Simplified framing (protocol unwrapping) co-routine. | |
""" | |
# Outer loop looking for a frame header | |
# | |
while True: | |
byte = (yield) | |
frame = [] # type: List[int] | |
if byte == header: | |
# Capture the full frame | |
# | |
while True: | |
byte = (yield) | |
if byte == footer: | |
target.send(frame) | |
break | |
elif byte == dle: | |
byte = (yield) | |
frame.append(after_dle_func(byte)) | |
else: | |
frame.append(byte) | |
def frame_receiver() -> FrameConsumerCoroutine: | |
""" | |
A simple co-routine "sink" for receiving full frames. | |
""" | |
while True: | |
frame = (yield) | |
print('Got frame:', ''.join('%02x' % x for x in frame)) | |
bytestream = bytes( | |
bytearray((0x70, 0x24, | |
0x61, 0x99, 0xAF, 0xD1, 0x62, | |
0x56, 0x62, | |
0x61, 0xAB, 0xAB, 0x14, 0x62, | |
0x7))) | |
frame_consumer = frame_receiver() | |
next(frame_consumer) # Get to the yield | |
unwrapper = unwrap_protocol(target=frame_consumer) | |
next(unwrapper) # Get to the yield | |
for byte in bytestream: | |
unwrapper.send(byte) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment