Created
June 13, 2022 13:32
-
-
Save chenzhuoyu/50888ca0a4597d01e888ddd13f751ee1 to your computer and use it in GitHub Desktop.
UART Line Monitor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import urwid | |
import signal | |
import string | |
import asyncio | |
import serial_asyncio | |
from urwid.raw_display import Screen | |
BANNER_TEXT = """UART Line Monitor v1.0 | |
Device : %s | |
Baudrate : %d | |
""" | |
def filter_non_print(cc): | |
if cc == 0x0d: | |
return '' | |
elif cc == 0x7f: | |
return '\u2421' | |
elif cc < 32 and cc not in (0x09, 0x0a): | |
return chr(cc + 0x2400) | |
else: | |
return chr(cc) | |
class SerialMon(asyncio.Protocol): | |
def on_enter(self): | |
self.send_command(self.cmd.edit_text.strip()) | |
self.cmd.edit_text, self.pos = '', len(self.history) | |
def on_scroll(self, delta): | |
if delta < 0 and self.pos >= -delta or \ | |
delta > 0 and self.pos < len(self.history) - delta: | |
self.pos += delta | |
self.cmd.set_edit_text('') | |
self.cmd.insert_text(self.history[self.pos]) | |
elif delta > 0: | |
self.pos = len(self.history) | |
self.cmd.set_edit_text('') | |
self.cmd.insert_text(self.editing) | |
def add_history(self, cmd): | |
if cmd and not (self.history and cmd == self.history[-1]): | |
self.history.append(cmd) | |
def send_command(self, cmd): | |
self.add_history(cmd.rstrip()) | |
self.ret.set_text(self.ret.text + '\n> ' + cmd + '\n') | |
self.transport.write(cmd.encode('utf-8') + b'\r\n') | |
def data_received(self, data): | |
text = ''.join(map(filter_non_print, data)) | |
self.ret.set_text(self.ret.text + text) | |
def connection_lost(self, _exc): | |
print() | |
self.ui.stop() | |
self.transport.loop.stop() | |
def connection_made(self, transport): | |
self.ret = urwid.Text(BANNER_TEXT % (transport.serial.port, transport.serial.baudrate)) | |
self.cmd = urwid.Edit('> ') | |
def on_change(_, text): | |
if self.pos == len(self.history): | |
self.editing = text | |
def unhandled_input(key): | |
if key == 'enter': | |
self.on_enter() | |
elif key in ('up', 'down'): | |
self.on_scroll(1 if key == 'down' else -1) | |
elif key == 'ctrl d' and not self.cmd.edit_text: | |
self.transport.close() | |
self.pos = 0 | |
self.editing = '' | |
self.history = [] | |
self.transport = transport | |
self.ui = urwid.MainLoop( | |
widget = urwid.Filler(urwid.Pile([self.ret, self.cmd]), 'top'), | |
event_loop = urwid.AsyncioEventLoop(), | |
handle_mouse = False, | |
unhandled_input = unhandled_input, | |
) | |
urwid.connect_signal(self.cmd, 'change', on_change) | |
self.ui.start() | |
def main(): | |
def usage(): | |
print('usage: %s [device] [baudrate]' % sys.argv[0], file = sys.stderr) | |
sys.exit(1) | |
if not (2 <= len(sys.argv) <= 3): | |
usage() | |
device = sys.argv[1] | |
baudrate = 115200 | |
if len(sys.argv) == 3: | |
try: | |
baudrate = int(sys.argv[2]) | |
except ValueError: | |
usage() | |
loop = asyncio.new_event_loop() | |
port, proto = loop.run_until_complete(serial_asyncio.create_serial_connection(loop, SerialMon, device, baudrate)) | |
urwid.set_encoding('utf-8') | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
loop.run_forever() | |
loop.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment