Last active
June 1, 2023 20:09
-
-
Save SealtielFreak/39f0450b25d67abbeb288d7bb9659c65 to your computer and use it in GitHub Desktop.
skech scheduler, serial testing at commsnds and overflow DMA test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import enum | |
class StateMachine(enum.Enum): | |
INIT = 0 | |
RUN = 1 | |
WAIT = 2 | |
class SchedulerAsync: | |
def __init__(self): | |
self.__all_programs = {} | |
def register(self, name): | |
def inner(func): | |
self.__all_programs[name] = func | |
return func | |
return inner | |
async def run(self): | |
state = StateMachine.INIT | |
while True: | |
state | |
return 0 | |
scheduler = SchedulerAsync() | |
@scheduler.register(StateMachine.INIT) | |
async def wait_program(): | |
yield StateMachine.WAIT | |
@scheduler.register(StateMachine.WAIT) | |
async def wait_program(): | |
while True: | |
yield StateMachine.WAIT | |
if __name__ == '__main__': | |
asyncio.run(scheduler.run()) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
import random | |
import serial |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import functools | |
import typing | |
import random | |
import serial | |
DEFAULT_PORT = "COM3" | |
DEFAULT_BAUD_RATE = 9000 | |
FAILURE_ERROR = 0 | |
FunctionAt = typing.Union[typing.Callable[[str], str], typing.Callable[[None], str]] | |
class Singleton(type): | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
def binary_output(func: FunctionAt): | |
def inner(*args): | |
return func(*args).encode() | |
return inner | |
def token_parser(_command: str): | |
open_str = False | |
chr_token = collections.deque() | |
deque_token = collections.deque() | |
join_str = lambda _iter_str: "".join(_iter_str) | |
append_deque = lambda _deque, _iter_str: _deque.append(join_str(_iter_str)) | |
deque_token_append = functools.partial(append_deque, deque_token) | |
for i, _chr in enumerate(_command): | |
if _chr in "'\"": | |
open_str = not open_str | |
if _chr in "=," and not open_str: | |
deque_token_append(chr_token) | |
chr_token.clear() | |
else: | |
chr_token.append(_chr) | |
deque_token_append(chr_token) | |
return tuple(deque_token) | |
def token_identifier(_str: str): | |
type_command = "UNKNOWN" | |
tokens = token_parser(_str) | |
length_tokens = len(tokens) | |
if length_tokens == 1: | |
return dict(type="Testing", tokens=tokens) | |
class ATCommand: | |
def __init__(self, name: str = "", *alias: typing.Tuple[str, ...], **kwargs): | |
self.__name = name.upper() | |
self.__alias_list = tuple(map(lambda _str: _str.upper(), alias)) | |
self.command = kwargs.get("command", lambda *args: None) | |
@property | |
def name(self): | |
return self.__name | |
def __contains__(self, item): | |
return item in self.__alias_list | |
def __hash__(self): | |
return hash(self.name) | |
class RegisterCommand(metaclass=Singleton): | |
def __init__(self): | |
self.__commands = {} | |
def attach(self, name): | |
def inner(func): | |
self.__commands[name] = ATCommand(name, command=func) | |
return func | |
return inner | |
def call(self, name): | |
return self.__commands[name].command() | |
REGISTER = RegisterCommand() | |
OK_MESSAGE = "OK" | |
ERROR_MESSAGE = "ERROR" | |
def random_message_probability(_keys_probability: dict) -> str: | |
_keys_probability = dict(sorted(_keys_probability.items(), key=lambda item: item[1])) | |
max_value = max(v for _, v in _keys_probability.items()) | |
while True: | |
n = random.randint(0, max_value) | |
for msg, percent in _keys_probability.items(): | |
_str = msg | |
if n in range(0, percent): | |
return _str | |
@REGISTER.attach("AT") | |
def at_test(): | |
return random_message_probability({ | |
OK_MESSAGE: 99, | |
ERROR_MESSAGE: 1 | |
}) | |
@REGISTER.attach("AT+SGNSRST") | |
def at_sgnsrst(*opts): | |
return "" | |
@REGISTER.attach("AT+CGNSINF") | |
def at_cgnsinf(): | |
""" | |
El comando AT+CGNSINF retorna la información del GNSS, como la latitud, la longitud, la altitud, la velocidad, etc. Un ejemplo de lo que regresa es el siguiente: | |
+CGNSINF: 1,1,20210316134242.000,31.2304,121.4737,14.7,0.00,0.0,1,3.5,3.3,0.9,11,8,41, | |
Esto significa que: | |
- El GNSS está encendido y tiene una posición válida. | |
- La fecha y hora son 2021-03-16 13:42:42.000 (UTC). | |
- La latitud es 31.2304 grados norte. | |
- La longitud es 121.4737 grados este. | |
- La altitud es 14.7 metros sobre el nivel del mar. | |
- La velocidad es 0.00 metros por segundo. | |
- El rumbo es 0.0 grados. | |
- El modo de navegación es autónomo. | |
- El número de satélites usados es 11. | |
- El número de satélites en vista es 8. | |
- El HDOP es 3.5. | |
- El PDOP es 3.3. | |
- El VDOP es 0.9. | |
- El SNR promedio es 41. | |
Puedes encontrar más información sobre este comando en el manual del módulo SIM7070 GNNS¹ o en otros sitios web²³. | |
Origen: Conversación con Bing, 6/1/2023 | |
(1) SIMCom SIM7070 SIM7080 SIM7090 AT Command Manual - Techship. https://techship.com/downloads/simcom-sim7070-sim7080-sim7090-at-command-manual/. | |
(2) SIM7070G Cat-M/NB-IoT/GPRS HAT - Waveshare Wiki. https://www.waveshare.com/wiki/SIM7070G_Cat-M/NB-IoT/GPRS_HAT. | |
(3) Implement SIMCom SIM7070/SIM7080/SIM7090 Series #419 - GitHub. https://github.com/vshymanskyy/TinyGSM/issues/419. | |
:return: str | |
""" | |
# return ",".join(map(str, (random.randint(0, 10000) for _ in range(16)))) | |
return "1,1,20210316134242.000,31.2304,121.4737,14.7,0.00,0.0,1,3.5,3.3,0.9,11,8,41," | |
if __name__ == "__main__": | |
print(token_parser("AT")) | |
print(token_parser("AT+SGNSRST")) | |
print(token_parser("AT+SGNSRST=1")) | |
print(token_parser("AT+CSCA='+9876543210', 120")) | |
register = RegisterCommand() | |
while True: | |
command = register.call("AT") | |
print(command) | |
""" | |
with serial.Serial(port=DEFAULT_PORT, baudrate=DEFAULT_BAUD_RATE) as _port: | |
port: serial.Serial = _port | |
try: | |
while True: | |
_command = random_output_position() | |
port.write(_command) | |
except serial.SerialException: | |
pass | |
except typing.Union[InterruptedError, KeyboardInterrupt]: | |
pass | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment