Last active
December 23, 2023 16:24
-
-
Save thewh1teagle/15c8d7d96bbea8c1ab449e2a606e08cf to your computer and use it in GitHub Desktop.
Baud Rate Detector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import pty | |
| import time | |
| import random | |
| import string | |
| import termios | |
| # Create a pair of pseudo-terminals (PTYs) | |
| master, slave = pty.openpty() | |
| name = os.ttyname(slave) # Use master, not slave | |
| print("Dev Path:", name) | |
| target_baud_rate = 9600 | |
| actual_baud_rate = 115200 | |
| baud_rate_ratio = actual_baud_rate / target_baud_rate | |
| sleep_time = 1 / baud_rate_ratio | |
| # Function to write data to the master PTY | |
| print('Writing...') | |
| with os.fdopen(master, 'wb') as fd: | |
| tios = termios.tcgetattr(fd) | |
| while True: | |
| message = ''.join(random.choice(string.ascii_letters) for _ in range(100)) + '\n' | |
| buf = message.encode('utf-8') | |
| for byte in buf: | |
| fd.write(byte.to_bytes(1, 'little')) | |
| fd.flush() | |
| time.sleep(sleep_time) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import time | |
| import serial # pip3 install pyserial | |
| from tqdm import tqdm # pip3 install tqdm | |
| class RateDetector: | |
| DURATION = 2 # 10 Seconds | |
| MIN_DATA_WARN = 3 # if less than 20 bytes than warn | |
| READ_CHUNK = 10 # Read 10 bytes | |
| THRESOLD = 30.0 # Allow 30% to be incorrect | |
| RATES = [110, 300, 600, 1200, 1800, 2400, 3600, 4800, 7200, 9600, 14400, 19200, 28800, 31250, 38400, 57600, 76800, 115200, 128000, 153600, 230400, 250000, 256000, 307200, 345600, 460800, 500000, 512000, 921600, 1024000, 2000000, 2500000, 3000000, 3686400] | |
| def __init__(self, device: serial.Serial) -> None: | |
| self.device = device | |
| def validate_chunk(self, data: bytes) -> bool: | |
| """ return in percentage the characters which are utf-8 valid """ | |
| try: | |
| data.decode('utf-8') | |
| return True | |
| except UnicodeError: | |
| return False | |
| def summarize_bytes(self, data: bytes): | |
| data = repr(data)[2:-1][:20] | |
| return data | |
| def correct_percentage(self, rate: int) -> float: | |
| start = time.time() | |
| chunk_count = 0 | |
| utf_valid_chunk_count = 0 | |
| bytes_count = 0 | |
| buf = b'' | |
| self.device.timeout = 0.2 | |
| # bytes hashmap for detecting duplicates (high rate of duplicates = may less correct) | |
| self.device.read(10) # start read something so things going on on the wire... | |
| while start + self.DURATION > time.time(): | |
| if self.device.in_waiting > 0: # Non blocking | |
| buf = self.device.read(self.READ_CHUNK) | |
| valid = self.validate_chunk(buf) | |
| if valid: | |
| utf_valid_chunk_count += 1 | |
| chunk_count += 1 | |
| bytes_count += len(buf) | |
| utf_valid_percentage = 0 if not chunk_count else utf_valid_chunk_count / chunk_count * 100 # HIgh = more correct | |
| overall_percentage = (utf_valid_percentage) / 1 | |
| if all(i == 0 for i in buf): | |
| overall_percentage = 0 | |
| return overall_percentage, buf, bytes_count | |
| def check_rate(self, rate: int): | |
| self.device.baudrate = rate | |
| percentage, last_chunk, read_count = engine.correct_percentage(rate) | |
| if percentage >= engine.THRESOLD: | |
| tqdm.write(f'✅ Rate {rate} looks correct by {int(percentage)}%! Last chunk: {self.summarize_bytes(last_chunk)}...') | |
| else: | |
| tqdm.write(f'❌ Rate {rate} looks incorrect by {100 - int(percentage)}%! {self.summarize_bytes(last_chunk)}...') | |
| if read_count < engine.MIN_DATA_WARN: | |
| tqdm.write(f'⚠️ Please Note that only {read_count} bytes received for rate {rate}') | |
| if __name__ == '__main__': | |
| port = sys.argv[1] | |
| device = serial.Serial(port) | |
| engine = RateDetector(device) | |
| bar = tqdm(engine.RATES[17:19], desc='Starting...') | |
| for rate in bar: | |
| bar.desc = f'Checking {rate}/s' | |
| bar.update() | |
| try: | |
| engine.check_rate(rate) | |
| except serial.SerialException as e: | |
| tqdm.write(f'❌ Error in rate {rate}: {e}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment