Created
February 16, 2022 14:56
-
-
Save atucom/4fb17ec8f0fa0b4751334b7cc2193ac8 to your computer and use it in GitHub Desktop.
Meshtastic shell server for executing commands on a remote system attached to a meshtastic radio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#@atucom | |
# PoC of getting a remote shell using a meshtastic radio. | |
# this works by using the meshtastic python package and the device connected over USB serial | |
# The long/slow transmit speed gives about 10 bytes/sec bandwidth which is crazy slow. It took 5.5mins to run 'ls -la' on my home dir | |
# This could be further improved by the following: | |
# - trying the short/fast mode | |
# - creating a dedicated channel | |
# - setting a non-default psk for encryption | |
# - using sendData instead of sendText | |
# - creating a binary transfer protocol to protect against interference | |
# - simulating a tty | |
# - add a text compression algo to speed up convo (gzip?) | |
# again, this was all just a PoC to see if it was possible and at what speed it oprates at in long/slow. | |
import meshtastic | |
import meshtastic.serial_interface | |
from pubsub import pub | |
from subprocess import run, PIPE | |
import time | |
interface = meshtastic.serial_interface.SerialInterface() | |
chunk_size = 150 | |
def onReceive(packet, interface): # called when a packet arrives | |
# Only take action if the "portnum" of the meessage is a text message | |
if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': #meshtastic ios app for testing. | |
from_id = packet['from'] | |
msg_payload = packet['decoded']['payload'] | |
msg_payload_utf8 = msg_payload.decode('utf8') | |
print(f"Received: {msg_payload} from {from_id}") | |
print(msg_payload.decode('utf8')) | |
ret = run(str(msg_payload_utf8), stdout=PIPE, shell=True) | |
output = ret.stdout | |
chunks_to_send = [output[index : index + chunk_size] for index in range(0, len(output), chunk_size)] | |
for chunk in chunks_to_send: | |
chunk = chunk.decode('utf8') | |
print(f"IM SENDING THIS NOW: {chunk}") | |
interface.sendText(f"RESULT: {chunk}", destinationId=from_id) | |
time.sleep(10) | |
while True: | |
pub.subscribe(onReceive, "meshtastic.receive") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment