Skip to content

Instantly share code, notes, and snippets.

@SealtielFreak
Last active June 1, 2023 20:09
Show Gist options
  • Save SealtielFreak/39f0450b25d67abbeb288d7bb9659c65 to your computer and use it in GitHub Desktop.
Save SealtielFreak/39f0450b25d67abbeb288d7bb9659c65 to your computer and use it in GitHub Desktop.
skech scheduler, serial testing at commsnds and overflow DMA test
import asyncio
import enum
class StateMachine(enum.Enum):
INIT = 0
RUN = 1
WAIT = 2
class SchedulerAsync:
def __init__(self):
self.__all_programs = {}
def register(self, name):
def inner(func):
self.__all_programs[name] = func
return func
return inner
async def run(self):
state = StateMachine.INIT
while True:
state
return 0
scheduler = SchedulerAsync()
@scheduler.register(StateMachine.INIT)
async def wait_program():
yield StateMachine.WAIT
@scheduler.register(StateMachine.WAIT)
async def wait_program():
while True:
yield StateMachine.WAIT
if __name__ == '__main__':
asyncio.run(scheduler.run())
import typing
import random
import serial
import collections
import functools
import typing
import random
import serial
DEFAULT_PORT = "COM3"
DEFAULT_BAUD_RATE = 9000
FAILURE_ERROR = 0
FunctionAt = typing.Union[typing.Callable[[str], str], typing.Callable[[None], str]]
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
def binary_output(func: FunctionAt):
def inner(*args):
return func(*args).encode()
return inner
def token_parser(_command: str):
open_str = False
chr_token = collections.deque()
deque_token = collections.deque()
join_str = lambda _iter_str: "".join(_iter_str)
append_deque = lambda _deque, _iter_str: _deque.append(join_str(_iter_str))
deque_token_append = functools.partial(append_deque, deque_token)
for i, _chr in enumerate(_command):
if _chr in "'\"":
open_str = not open_str
if _chr in "=," and not open_str:
deque_token_append(chr_token)
chr_token.clear()
else:
chr_token.append(_chr)
deque_token_append(chr_token)
return tuple(deque_token)
def token_identifier(_str: str):
type_command = "UNKNOWN"
tokens = token_parser(_str)
length_tokens = len(tokens)
if length_tokens == 1:
return dict(type="Testing", tokens=tokens)
class ATCommand:
def __init__(self, name: str = "", *alias: typing.Tuple[str, ...], **kwargs):
self.__name = name.upper()
self.__alias_list = tuple(map(lambda _str: _str.upper(), alias))
self.command = kwargs.get("command", lambda *args: None)
@property
def name(self):
return self.__name
def __contains__(self, item):
return item in self.__alias_list
def __hash__(self):
return hash(self.name)
class RegisterCommand(metaclass=Singleton):
def __init__(self):
self.__commands = {}
def attach(self, name):
def inner(func):
self.__commands[name] = ATCommand(name, command=func)
return func
return inner
def call(self, name):
return self.__commands[name].command()
REGISTER = RegisterCommand()
OK_MESSAGE = "OK"
ERROR_MESSAGE = "ERROR"
def random_message_probability(_keys_probability: dict) -> str:
_keys_probability = dict(sorted(_keys_probability.items(), key=lambda item: item[1]))
max_value = max(v for _, v in _keys_probability.items())
while True:
n = random.randint(0, max_value)
for msg, percent in _keys_probability.items():
_str = msg
if n in range(0, percent):
return _str
@REGISTER.attach("AT")
def at_test():
return random_message_probability({
OK_MESSAGE: 99,
ERROR_MESSAGE: 1
})
@REGISTER.attach("AT+SGNSRST")
def at_sgnsrst(*opts):
return ""
@REGISTER.attach("AT+CGNSINF")
def at_cgnsinf():
"""
El comando AT+CGNSINF retorna la información del GNSS, como la latitud, la longitud, la altitud, la velocidad, etc. Un ejemplo de lo que regresa es el siguiente:
+CGNSINF: 1,1,20210316134242.000,31.2304,121.4737,14.7,0.00,0.0,1,3.5,3.3,0.9,11,8,41,
Esto significa que:
- El GNSS está encendido y tiene una posición válida.
- La fecha y hora son 2021-03-16 13:42:42.000 (UTC).
- La latitud es 31.2304 grados norte.
- La longitud es 121.4737 grados este.
- La altitud es 14.7 metros sobre el nivel del mar.
- La velocidad es 0.00 metros por segundo.
- El rumbo es 0.0 grados.
- El modo de navegación es autónomo.
- El número de satélites usados es 11.
- El número de satélites en vista es 8.
- El HDOP es 3.5.
- El PDOP es 3.3.
- El VDOP es 0.9.
- El SNR promedio es 41.
Puedes encontrar más información sobre este comando en el manual del módulo SIM7070 GNNS¹ o en otros sitios web²³.
Origen: Conversación con Bing, 6/1/2023
(1) SIMCom SIM7070 SIM7080 SIM7090 AT Command Manual - Techship. https://techship.com/downloads/simcom-sim7070-sim7080-sim7090-at-command-manual/.
(2) SIM7070G Cat-M/NB-IoT/GPRS HAT - Waveshare Wiki. https://www.waveshare.com/wiki/SIM7070G_Cat-M/NB-IoT/GPRS_HAT.
(3) Implement SIMCom SIM7070/SIM7080/SIM7090 Series #419 - GitHub. https://github.com/vshymanskyy/TinyGSM/issues/419.
:return: str
"""
# return ",".join(map(str, (random.randint(0, 10000) for _ in range(16))))
return "1,1,20210316134242.000,31.2304,121.4737,14.7,0.00,0.0,1,3.5,3.3,0.9,11,8,41,"
if __name__ == "__main__":
print(token_parser("AT"))
print(token_parser("AT+SGNSRST"))
print(token_parser("AT+SGNSRST=1"))
print(token_parser("AT+CSCA='+9876543210', 120"))
register = RegisterCommand()
while True:
command = register.call("AT")
print(command)
"""
with serial.Serial(port=DEFAULT_PORT, baudrate=DEFAULT_BAUD_RATE) as _port:
port: serial.Serial = _port
try:
while True:
_command = random_output_position()
port.write(_command)
except serial.SerialException:
pass
except typing.Union[InterruptedError, KeyboardInterrupt]:
pass
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment