Last active
October 1, 2024 21:14
-
-
Save ekawahyu/f1bd1bf64aedfed3354128938f01f278 to your computer and use it in GitHub Desktop.
Resilient USB Serial AT Command - Waiting for AT Port, reconnecting feature, and exception handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import time | |
import serial | |
import serial.threaded | |
import threading | |
import queue | |
# find alternative to define queue in user space, not here please | |
ATResponse = queue.Queue() | |
class ATReader(serial.threaded.LineReader): | |
'AT+Command Reader Class' | |
def connection_made(self, transport): | |
super(ATReader, self).connection_made(transport) | |
self.received_lines = [] | |
print('ATReader: connection open') | |
def handle_line(self, data): | |
data = data.replace('\r', '') | |
data = data.replace('', '') | |
self.received_lines.append(data) | |
ATResponse.put(data) | |
print('ATReader: received line: %s' % data) | |
def connection_lost(self, exc): | |
print('ATReader: connection close') | |
class ATCommand(threading.Thread): | |
'AT+Command Class extends Threading' | |
def __init__(self, port, baudrate, callback=None, exc=None, timewait=0.1): | |
super(ATCommand, self).__init__() | |
self._stop_event = threading.Event() | |
self.port = port | |
self.ports = [] | |
self.baudrate = baudrate | |
self.timewait = timewait | |
self.connection = None | |
self.reconnectwait = 5 | |
self.start() | |
self.linereader = None | |
self.callback = callback | |
self.exc = exc | |
def run(self): | |
while not self._stop_event.is_set(): | |
if not self.connection or not self.connection.is_open: | |
self.connect() | |
if self.linereader and not self.linereader.is_alive(): | |
print('ATCommand: Connection lost. Reconnecting ...') | |
self.disconnect() | |
try: | |
if ATResponse.not_empty: | |
response = ATResponse.get(timeout=.1) | |
if self.callback: | |
self.callback(response) | |
except queue.Empty: | |
# queue.get() timeout raises Empty exception | |
pass | |
print("ATCommand: terminate ATCommand") | |
def disconnect(self): | |
if self.linereader: | |
self.linereader.close() | |
self.linereader = None | |
if self.connection: | |
self.connection.close() | |
self.connection = None | |
def connect(self): | |
try: | |
print('ATCommand: connecting to %s at %s baudrate' % (self.port, self.baudrate)) | |
self.connection = serial.serial_for_url(self.port, self.baudrate) | |
self.linereader = serial.threaded.ReaderThread(self.connection, ATReader) | |
self.linereader.start() | |
self.transport, self.protocol = self.linereader.connect() | |
print('ATCommand: connected to %s successfully' % self.port) | |
except serial.SerialException as e: | |
print('ATCommand: failed to connect: %s' % e) | |
time.sleep(self.reconnectwait) | |
def handle_connection_lost(self): | |
if self.linereader and self.linereader.is_alive(): | |
self.linereader.close() | |
if self.connection and self.connection.is_open: | |
self.connection.close() | |
print('ATCommand: attempting to reconnect in %s seconds ...' % self.reconnectwait) | |
time.sleep(self.reconnectwait) | |
self.connect() | |
def stop(self): | |
self._stop_event.set() | |
if self.linereader and self.linereader.is_alive(): | |
self.linereader.close() | |
if self.connection and self.connection.is_open: | |
self.connection.close() | |
print('ATCommand: serial connection closed') | |
def send_at_command(self, command, timewait=0.1): | |
try: | |
self.protocol.received_lines = [] | |
self.protocol.write_line('%s' % command) | |
if (timewait): | |
time.sleep(timewait) | |
else: | |
time.sleep(self.timewait) | |
return self.protocol.received_lines | |
except serial.SerialException as e: | |
if self.exc: | |
self.exc(e, command) | |
except AttributeError as e: | |
print('ATCommmand: %s' % e) | |
def at_response(response): | |
print('Got AT response: %s' % response) | |
pass | |
def at_exception(e, command): | |
print('Got AT command exception: %s' % command) | |
pass | |
def serialSearchATPort(): | |
return glob.glob('/dev/ttyUSB*') | |
print('Waiting for any serial port ...') | |
while True: | |
serial_ports = serialSearchATPort() | |
time.sleep(1) | |
if serial_ports: | |
print(serial_ports) | |
break | |
# serial_ports.reverse() | |
for serial_port in serial_ports: | |
try: | |
global EC25 | |
print('Probing ... %s', serial_port) | |
EC25 = ATCommand(serial_port, 115200, callback=at_response, exc=at_exception) | |
time.sleep(0.5) | |
EC25.send_at_command('ATE1') | |
responses = EC25.send_at_command('AT') | |
if responses: | |
if responses[-1] == 'OK': | |
print('Found AT port: %s', serial_port) | |
serial_ports = [] | |
break | |
else: | |
EC25.stop() | |
except: | |
print('Error probing %s', serial_port) | |
time.sleep(3) | |
EC25.send_at_command('ate1') | |
# Test reconnection by plug/unplug USB serial device here for multiple times | |
time.sleep(20) | |
EC25.send_at_command('at') | |
# Test reconnection by plug/unplug USB serial device here for multiple times | |
time.sleep(20) | |
EC25.send_at_command('AT') | |
# Test reconnection by plug/unplug USB serial device here for multiple times | |
EC25.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist seems to have issue when
serial_ports.reverse()
is uncommented. Especially, when there are bunch of data flow during serial port probing. Not sure how to solve this problem.