Skip to content

Instantly share code, notes, and snippets.

@thewh1teagle
Last active December 23, 2023 16:24
Show Gist options
  • Select an option

  • Save thewh1teagle/15c8d7d96bbea8c1ab449e2a606e08cf to your computer and use it in GitHub Desktop.

Select an option

Save thewh1teagle/15c8d7d96bbea8c1ab449e2a606e08cf to your computer and use it in GitHub Desktop.
Baud Rate Detector
import os
import pty
import time
import random
import string
import termios
# Create a pair of pseudo-terminals (PTYs)
master, slave = pty.openpty()
name = os.ttyname(slave) # Use master, not slave
print("Dev Path:", name)
target_baud_rate = 9600
actual_baud_rate = 115200
baud_rate_ratio = actual_baud_rate / target_baud_rate
sleep_time = 1 / baud_rate_ratio
# Function to write data to the master PTY
print('Writing...')
with os.fdopen(master, 'wb') as fd:
tios = termios.tcgetattr(fd)
while True:
message = ''.join(random.choice(string.ascii_letters) for _ in range(100)) + '\n'
buf = message.encode('utf-8')
for byte in buf:
fd.write(byte.to_bytes(1, 'little'))
fd.flush()
time.sleep(sleep_time)
import sys
import time
import serial # pip3 install pyserial
from tqdm import tqdm # pip3 install tqdm
class RateDetector:
DURATION = 2 # 10 Seconds
MIN_DATA_WARN = 3 # if less than 20 bytes than warn
READ_CHUNK = 10 # Read 10 bytes
THRESOLD = 30.0 # Allow 30% to be incorrect
RATES = [110, 300, 600, 1200, 1800, 2400, 3600, 4800, 7200, 9600, 14400, 19200, 28800, 31250, 38400, 57600, 76800, 115200, 128000, 153600, 230400, 250000, 256000, 307200, 345600, 460800, 500000, 512000, 921600, 1024000, 2000000, 2500000, 3000000, 3686400]
def __init__(self, device: serial.Serial) -> None:
self.device = device
def validate_chunk(self, data: bytes) -> bool:
""" return in percentage the characters which are utf-8 valid """
try:
data.decode('utf-8')
return True
except UnicodeError:
return False
def summarize_bytes(self, data: bytes):
data = repr(data)[2:-1][:20]
return data
def correct_percentage(self, rate: int) -> float:
start = time.time()
chunk_count = 0
utf_valid_chunk_count = 0
bytes_count = 0
buf = b''
self.device.timeout = 0.2
# bytes hashmap for detecting duplicates (high rate of duplicates = may less correct)
self.device.read(10) # start read something so things going on on the wire...
while start + self.DURATION > time.time():
if self.device.in_waiting > 0: # Non blocking
buf = self.device.read(self.READ_CHUNK)
valid = self.validate_chunk(buf)
if valid:
utf_valid_chunk_count += 1
chunk_count += 1
bytes_count += len(buf)
utf_valid_percentage = 0 if not chunk_count else utf_valid_chunk_count / chunk_count * 100 # HIgh = more correct
overall_percentage = (utf_valid_percentage) / 1
if all(i == 0 for i in buf):
overall_percentage = 0
return overall_percentage, buf, bytes_count
def check_rate(self, rate: int):
self.device.baudrate = rate
percentage, last_chunk, read_count = engine.correct_percentage(rate)
if percentage >= engine.THRESOLD:
tqdm.write(f'✅ Rate {rate} looks correct by {int(percentage)}%! Last chunk: {self.summarize_bytes(last_chunk)}...')
else:
tqdm.write(f'❌ Rate {rate} looks incorrect by {100 - int(percentage)}%! {self.summarize_bytes(last_chunk)}...')
if read_count < engine.MIN_DATA_WARN:
tqdm.write(f'⚠️ Please Note that only {read_count} bytes received for rate {rate}')
if __name__ == '__main__':
port = sys.argv[1]
device = serial.Serial(port)
engine = RateDetector(device)
bar = tqdm(engine.RATES[17:19], desc='Starting...')
for rate in bar:
bar.desc = f'Checking {rate}/s'
bar.update()
try:
engine.check_rate(rate)
except serial.SerialException as e:
tqdm.write(f'❌ Error in rate {rate}: {e}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment