Last active
October 23, 2020 13:56
-
-
Save RoganDawes/247c503257776ff8fe426131e1b5987d to your computer and use it in GitHub Desktop.
Some Quick and Dirty python code for reading from two serial ports at once. Also, a Teensy sketch for doing the same using actual simultaneous UARTs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import serial, time, sys, threading | |
from colorama import Fore, Style, init as colorama_init | |
from termcolor import colored | |
colorama_init() | |
# lock to serialize console output | |
lock = threading.Lock() | |
class Highlight: | |
def __init__(self, clazz, color): | |
self.color = color | |
self.clazz = clazz | |
def __enter__(self): | |
print(self.color, end="") | |
def __exit__(self, type, value, traceback): | |
if self.clazz == Fore: | |
print(Fore.RESET, end="") | |
else: | |
assert self.clazz == Style | |
print(Style.RESET_ALL, end="") | |
sys.stdout.flush() | |
if len(sys.argv) != 3 and len(sys.argv) != 4: | |
sys.stderr.write("Usage: %s <baud> <port1> [<port2>]\n" % (sys.argv[0])) | |
exit(1) | |
def hexdump( start, src, length=16, sep='.' ): | |
''' | |
@brief Return {src} in hex dump. | |
@param[in] length {Int} Nb Bytes by row. | |
@param[in] sep {Char} For the text part, {sep} will be used for non ASCII char. | |
@return {Str} The hexdump | |
@note Full support for python2 and python3 ! | |
''' | |
result = []; | |
# Python3 support | |
try: | |
xrange(0,1); | |
except NameError: | |
xrange = range; | |
for i in xrange(0, len(src), length): | |
subSrc = src[i:i+length]; | |
hexa = ''; | |
isMiddle = False; | |
for h in xrange(0,len(subSrc)): | |
if h == length/2: | |
hexa += ' '; | |
h = subSrc[h]; | |
if not isinstance(h, int): | |
h = ord(h); | |
h = hex(h).replace('0x',''); | |
if len(h) == 1: | |
h = '0'+h; | |
hexa += h+' '; | |
hexa = hexa.strip(' '); | |
text = ''; | |
for c in subSrc: | |
if not isinstance(c, int): | |
c = ord(c); | |
if 0x20 <= c < 0x7F: | |
text += chr(c); | |
else: | |
text += sep; | |
result.append(('%08X: %-'+str(length*(2+1)+1)+'s |%s|') % (start + i, hexa, text)); | |
return '\n'.join(result); | |
def read_serial(port, baud, color, binary): | |
ser = serial.Serial() | |
ser.port = port | |
ser.baudrate = baud | |
ser.bytesize = serial.EIGHTBITS #number of bits per bytes | |
ser.parity = serial.PARITY_NONE #set parity check: no parity | |
ser.stopbits = serial.STOPBITS_ONE #number of stop bits | |
# ser.timeout = None #block read | |
ser.timeout = 0.1 # non blocking read | |
ser.xonxoff = False #disable software flow control | |
ser.rtscts = False #disable hardware (RTS/CTS) flow control | |
ser.dsrdtr = False #disable hardware (DSR/DTR) flow control | |
ser.writeTimeout = 2 #timeout for write | |
pos = 0 | |
try: | |
ser.open() | |
except Exception as e: | |
print("error open serial port: " + str(e)) | |
exit() | |
if ser.isOpen(): | |
try: | |
ser.dtr=True | |
while True: | |
c = ser.read(size=16) | |
if len(c) > 0: | |
with lock: | |
# sys.stdout.write((color)) | |
if (binary): | |
# sys.stdout.buffer.write(bytes(colored(hexdump(pos, c, 8)+'\n', color),"ascii")) | |
print(colored(hexdump(pos, c, 16), color)) | |
pos += len(c) | |
else: | |
sys.stdout.buffer.write(c) | |
sys.stdout.buffer.flush() | |
ser.close() | |
except Exception as e1: | |
print ("error communicating...: " + str(e1)) | |
exit() | |
else: | |
print("cannot open serial port ") | |
exit() | |
# Create two threads as follows | |
try: | |
t1 = threading.Thread(target=read_serial, args=(sys.argv[2], sys.argv[1], 'green', True )) | |
t1.daemon = True # thread dies when main thread (only non-daemon thread) exits. | |
t1.start() | |
if len(sys.argv) == 4: | |
t2 = threading.Thread(target=read_serial, args=(sys.argv[3], sys.argv[1], 'red', True )) | |
t2.daemon = True # thread dies when main thread (only non-daemon thread) exits. | |
t2.start() | |
except: | |
print("Error: unable to start thread") | |
try: | |
t1.join() | |
except KeyboardInterrupt: | |
exit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import serial, time, sys, threading | |
# lock to serialize console output | |
lock = threading.Lock() | |
if len(sys.argv) != 3 and len(sys.argv) != 4: | |
sys.stderr.write("Usage: %s <baud> <port1> [<port2>]\n" % (sys.argv[0])) | |
exit(1) | |
def read_serial(port, baud): | |
ser = serial.Serial() | |
ser.port = port | |
ser.baudrate = baud | |
ser.bytesize = serial.EIGHTBITS #number of bits per bytes | |
ser.parity = serial.PARITY_NONE #set parity check: no parity | |
ser.stopbits = serial.STOPBITS_ONE #number of stop bits | |
# ser.timeout = None #block read | |
ser.timeout = 0.1 # non blocking read | |
ser.xonxoff = False #disable software flow control | |
ser.rtscts = False #disable hardware (RTS/CTS) flow control | |
ser.dsrdtr = False #enable hardware (DSR/DTR) flow control | |
ser.writeTimeout = 2 #timeout for write | |
pos = 0 | |
try: | |
ser.open() | |
except Exception as e: | |
print("error open serial port: " + str(e)) | |
exit() | |
if ser.isOpen(): | |
try: | |
ser.dtr=True | |
while True: | |
c = ser.read(size=65) | |
if len(c) > 0: | |
with lock: | |
sys.stdout.buffer.write(c) | |
sys.stdout.buffer.flush() | |
ser.close() | |
except Exception as e1: | |
print ("error communicating...: " + str(e1)) | |
exit() | |
else: | |
print("cannot open serial port ") | |
exit() | |
# Create two threads as follows | |
try: | |
t1 = threading.Thread(target=read_serial, args=(sys.argv[2], sys.argv[1])) | |
t1.daemon = True # thread dies when main thread (only non-daemon thread) exits. | |
t1.start() | |
if len(sys.argv) == 4: | |
t2 = threading.Thread(target=read_serial, args=(sys.argv[3], sys.argv[1])) | |
t2.daemon = True # thread dies when main thread (only non-daemon thread) exits. | |
t2.start() | |
except: | |
print("Error: unable to start thread") | |
try: | |
t1.join() | |
except KeyboardInterrupt: | |
exit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <elapsedMillis.h> | |
#define PIN_D2 2 | |
#define MAX_BUFFER 16 | |
uint8_t buffer1[MAX_BUFFER], buffer1pos = 0, buffer3[MAX_BUFFER], buffer3pos = 0; | |
char buffer1prefix[] = "E> ", buffer3prefix[] = "A> "; | |
elapsedMillis TimeSinceRead; | |
void setup() { | |
Serial.begin(250000); | |
while (!Serial.dtr()); | |
Serial1.begin(250000); | |
Serial3.begin(250000); | |
pinMode(PIN_D2, OUTPUT); | |
digitalWrite(PIN_D2, HIGH); | |
Serial.println("In process"); | |
} | |
void PrintBuffer(char *prefix, uint8_t buffer[], uint8_t *buffer_pos, bool flush) { | |
if (*buffer_pos == 0 || (*buffer_pos < MAX_BUFFER && !flush)) | |
return; | |
char buff[12 + strlen(prefix) + MAX_BUFFER*4+5]; | |
uint8_t o = 0; | |
o += sprintf(buff+o, "%010lu: ", millis()); | |
o += sprintf(buff+o, "%s", prefix); | |
for (uint8_t i=0; i<MAX_BUFFER; i++) { | |
if (i < *buffer_pos) { | |
o += sprintf(buff+o, "%02X ", buffer[i]); | |
} else { | |
o += sprintf(buff+o, " "); | |
} | |
} | |
o += sprintf(buff+o, " |"); | |
for (uint8_t i=0; i<MAX_BUFFER; i++) { | |
o += sprintf(buff+o, "%c", i<*buffer_pos ? (buffer[i] >31 && buffer[i] <= 0x7F ? buffer[i] : '.') : ' '); | |
} | |
o += sprintf(buff+o, "|\r\n"); | |
*buffer_pos = 0; | |
Serial.write(buff, o); | |
} | |
void PrintBuffer1(bool flush) { | |
PrintBuffer(buffer1prefix, buffer1, &buffer1pos, flush); | |
} | |
void PrintBuffer3(bool flush) { | |
PrintBuffer(buffer3prefix, buffer3, &buffer3pos, flush); | |
} | |
void loop() { | |
int incomingByte; | |
if (Serial.available() > 0) { | |
incomingByte = Serial.read(); | |
if (incomingByte == 32) { | |
Serial.print("\r\nReset!\r\n"); | |
digitalWrite(PIN_D2, LOW); | |
delay(250); | |
digitalWrite(PIN_D2, HIGH); | |
delay(250); | |
} | |
} | |
if (Serial1.available() > 0) { | |
buffer1[buffer1pos++] = Serial1.read(); | |
TimeSinceRead = 0; | |
PrintBuffer1(false); | |
} | |
if (Serial3.available() > 0) { | |
buffer3[buffer3pos++] = Serial3.read(); | |
TimeSinceRead = 0; | |
PrintBuffer3(false); | |
} | |
if (TimeSinceRead > 100) { | |
PrintBuffer1(true); | |
PrintBuffer3(true); | |
TimeSinceRead = 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment