Skip to content

Instantly share code, notes, and snippets.

@ekawahyu
Last active October 1, 2024 21:14
Show Gist options
  • Save ekawahyu/f1bd1bf64aedfed3354128938f01f278 to your computer and use it in GitHub Desktop.
Save ekawahyu/f1bd1bf64aedfed3354128938f01f278 to your computer and use it in GitHub Desktop.
Resilient USB Serial AT Command - Waiting for AT Port, reconnecting feature, and exception handling
import glob
import time
import serial
import serial.threaded
import threading
import queue
# find alternative to define queue in user space, not here please
ATResponse = queue.Queue()
class ATReader(serial.threaded.LineReader):
'AT+Command Reader Class'
def connection_made(self, transport):
super(ATReader, self).connection_made(transport)
self.received_lines = []
print('ATReader: connection open')
def handle_line(self, data):
data = data.replace('\r', '')
data = data.replace('', '')
self.received_lines.append(data)
ATResponse.put(data)
print('ATReader: received line: %s' % data)
def connection_lost(self, exc):
print('ATReader: connection close')
class ATCommand(threading.Thread):
'AT+Command Class extends Threading'
def __init__(self, port, baudrate, callback=None, exc=None, timewait=0.1):
super(ATCommand, self).__init__()
self._stop_event = threading.Event()
self.port = port
self.ports = []
self.baudrate = baudrate
self.timewait = timewait
self.connection = None
self.reconnectwait = 5
self.start()
self.linereader = None
self.callback = callback
self.exc = exc
def run(self):
while not self._stop_event.is_set():
if not self.connection or not self.connection.is_open:
self.connect()
if self.linereader and not self.linereader.is_alive():
print('ATCommand: Connection lost. Reconnecting ...')
self.disconnect()
try:
if ATResponse.not_empty:
response = ATResponse.get(timeout=.1)
if self.callback:
self.callback(response)
except queue.Empty:
# queue.get() timeout raises Empty exception
pass
print("ATCommand: terminate ATCommand")
def disconnect(self):
if self.linereader:
self.linereader.close()
self.linereader = None
if self.connection:
self.connection.close()
self.connection = None
def connect(self):
try:
print('ATCommand: connecting to %s at %s baudrate' % (self.port, self.baudrate))
self.connection = serial.serial_for_url(self.port, self.baudrate)
self.linereader = serial.threaded.ReaderThread(self.connection, ATReader)
self.linereader.start()
self.transport, self.protocol = self.linereader.connect()
print('ATCommand: connected to %s successfully' % self.port)
except serial.SerialException as e:
print('ATCommand: failed to connect: %s' % e)
time.sleep(self.reconnectwait)
def handle_connection_lost(self):
if self.linereader and self.linereader.is_alive():
self.linereader.close()
if self.connection and self.connection.is_open:
self.connection.close()
print('ATCommand: attempting to reconnect in %s seconds ...' % self.reconnectwait)
time.sleep(self.reconnectwait)
self.connect()
def stop(self):
self._stop_event.set()
if self.linereader and self.linereader.is_alive():
self.linereader.close()
if self.connection and self.connection.is_open:
self.connection.close()
print('ATCommand: serial connection closed')
def send_at_command(self, command, timewait=0.1):
try:
self.protocol.received_lines = []
self.protocol.write_line('%s' % command)
if (timewait):
time.sleep(timewait)
else:
time.sleep(self.timewait)
return self.protocol.received_lines
except serial.SerialException as e:
if self.exc:
self.exc(e, command)
except AttributeError as e:
print('ATCommmand: %s' % e)
def at_response(response):
print('Got AT response: %s' % response)
pass
def at_exception(e, command):
print('Got AT command exception: %s' % command)
pass
def serialSearchATPort():
return glob.glob('/dev/ttyUSB*')
print('Waiting for any serial port ...')
while True:
serial_ports = serialSearchATPort()
time.sleep(1)
if serial_ports:
print(serial_ports)
break
# serial_ports.reverse()
for serial_port in serial_ports:
try:
global EC25
print('Probing ... %s', serial_port)
EC25 = ATCommand(serial_port, 115200, callback=at_response, exc=at_exception)
time.sleep(0.5)
EC25.send_at_command('ATE1')
responses = EC25.send_at_command('AT')
if responses:
if responses[-1] == 'OK':
print('Found AT port: %s', serial_port)
serial_ports = []
break
else:
EC25.stop()
except:
print('Error probing %s', serial_port)
time.sleep(3)
EC25.send_at_command('ate1')
# Test reconnection by plug/unplug USB serial device here for multiple times
time.sleep(20)
EC25.send_at_command('at')
# Test reconnection by plug/unplug USB serial device here for multiple times
time.sleep(20)
EC25.send_at_command('AT')
# Test reconnection by plug/unplug USB serial device here for multiple times
EC25.stop()
@ekawahyu
Copy link
Author

ekawahyu commented Oct 1, 2024

This gist seems to have issue when serial_ports.reverse() is uncommented. Especially, when there are bunch of data flow during serial port probing. Not sure how to solve this problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment