Created
October 26, 2022 07:53
-
-
Save xssfox/8b320bde5c9785408e16d3ce4208abfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kissfix # TODO do we need to worry about the python3 issue / kissfix | |
import serial | |
import os, pty, serial, tty, termios | |
import threading | |
import logging | |
import sys, traceback | |
import fcntl | |
import time | |
logger = logging.getLogger() | |
logging.basicConfig(level=logging.DEBUG) | |
# This deals with encoding and decoding KISS frames | |
class KissInterface(): | |
def __init__(self, callback): | |
self.k = kissfix.SerialKISS('/dev/ptmx', 9600) | |
self.k.start() | |
# Override the serial interface with our own PTY file descriptor | |
self.control, self.user_port = pty.openpty() | |
self.ttyname = os.ttyname(self.user_port) | |
self.k.interface.fd = self.control # we need to override the the serial port with the fd from pty | |
tty.setraw(self.control, termios.TCSANOW) # this makes the tty act more like a serial port | |
# change flags to be non blocking so that buffer full doesn't cause issues | |
flags = fcntl.fcntl(self.control, fcntl.F_GETFL) | |
flags |= os.O_NONBLOCK | |
fcntl.fcntl(self.control, fcntl.F_SETFL, flags) | |
self.rx_thread = KissThread(callback, self.k) | |
self.rx_thread.setDaemon(True) | |
self.rx_thread.start() | |
def tx(self, bytes_in: bytes): | |
frame = kissfix.FEND + b'\00' + kissfix.escape_special_codes(bytes_in) + kissfix.FEND | |
try: | |
os.write(self.control, frame) | |
except BlockingIOError: | |
logging.error("PTY interface buffer is full. The connected application may have crashed or isn't reading fast enough. Data loss is likely. Alternatively you aren't using the PTY interface and should have used --no-pty. Clearing the buffer now so we can keep going") | |
blocking = os.get_blocking(self.user_port) # remember what the state was before | |
os.set_blocking(self.user_port, False) | |
try: | |
while 1: | |
os.read(self.user_port,32) # read off the buffer until we've cleared it | |
except BlockingIOError: | |
pass | |
os.set_blocking(self.user_port, blocking) # restore the state after | |
class KissThread(threading.Thread): | |
def __init__(self,callback, interface): | |
threading.Thread.__init__(self) | |
self.callback = callback | |
self._running = True | |
self.interface = interface | |
def run(self): | |
while self._running == True: | |
# check TNC port | |
for frame in self.interface.read(readmode=False): | |
self.callback(bytes(frame[1:])) #we strip the first two byte which is TNC port number. | |
def terminate(self): | |
self._running = False | |
def kiss_rx_callback(frame: bytes): | |
if len(frame) < 18: # I don't know the rules yet | |
frame += b'\x00' * 18 | |
radio.write(frame) | |
logging.debug(f"Received KISS frame: {frame.hex()}") | |
tnc_interface = KissInterface(kiss_rx_callback) | |
logger.info(f'TNC port is at : {tnc_interface.ttyname}') | |
radio = kissfix.SerialKISS('/dev/ttyAMA0', 9600) | |
radio.start() | |
def radio_rx(frame: bytes): | |
tnc_interface.tx(frame) | |
logging.debug(f"Received KISS frame from radio: {frame.hex()}") | |
radio_thread = KissThread(radio_rx, radio) | |
radio_thread.setDaemon(True) | |
radio_thread.start() | |
print("running") | |
while 1: | |
time.sleep(0.1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment