Skip to content

Instantly share code, notes, and snippets.

@jueti
Last active September 22, 2018 15:57
Show Gist options
  • Save jueti/b379eb6d8cdec0acfeccd8fa3b3d43cf to your computer and use it in GitHub Desktop.
Save jueti/b379eb6d8cdec0acfeccd8fa3b3d43cf to your computer and use it in GitHub Desktop.
[Snippet - tool class]
#!/usr/bin/python 3
# -*- coding: utf-8 -*-
"""
`Device` is a tool class to operate serial ports.
"""
import json
import logging
import time
from typing import List, Dict, Tuple
import serial.tools.list_ports
DEBUG = True
Ignore = ['COM1']
if DEBUG:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(filename='device.log', filemode='w', level=logging.INFO)
# list all serial ports
def get_ports_info() -> List[Dict[str, str]]:
"""
list all serial ports.
you can format the output string literals using the follow code:
>>> import json
>>> json.dumps(get_ports_info(), ensure_ascii=False, sort_keys=False, indent=4, separators=(',', ':'))
... # doctest: +ELLIPSIS
'[...]'
:return: return port info with json structure: [{}, {}, ...]
"""
# parameters
ports_info = []
attribute = ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer',
'product', 'interface']
# save com info with json structure
# serial.tools.list_ports.comports() returns: an iterable that yields ListPortInfo objects
for port_info in serial.tools.list_ports.comports():
port_info = {key: getattr(port_info, key) for key in attribute} # change ListPortInfo object to Dict object
ports_info.append(port_info)
return ports_info
def sort_by_devices(items: str) -> str:
position = 0
for item in reversed(items):
if item.isdigit():
position -= 1
else:
break
return items[position:] + items[:position]
def get_all_devices() -> Tuple[str]:
devices_all = []
for port_info in serial.tools.list_ports.comports():
device: str = port_info.device
devices_all.append(device)
devices_all = sorted(devices_all, key=sort_by_devices)
return tuple(devices_all)
def get_ignore_devices() -> Tuple[str]:
return tuple(Ignore)
class Device(object):
"""
Tool for operating serial ports.
Available tool: get, start, stop, cmd, msg, info
"""
# pre setup
_current = '' # current com. example: 'com1' or 'com2'
_ignore = [] # com which you want to ignore. example: ['com1', 'com2']
def __init__(self):
self.devices = get_all_devices()
self.connected = list() # Keep connected devices. example: ['COM2', 'COM3']
# start and keep the connections of devices
def start(self):
logging.debug('Start connect '.ljust(60, '='))
for i in self.devices:
logging.debug('[{}]:Connect'.format(i))
ser = serial.Serial(timeout=0.1)
ser.port = i
try:
ser.open()
except IOError as e:
logging.error('[{}]:{}'.format(i, e))
continue
if ser.in_waiting:
logging.debug('[{}]:Invalid chars in the receive buffer'.format(i))
logging.debug('[{}]:Clear receive buffer'.format(i))
ser.readall()
logging.debug('[{}]:Keep connection'.format(i))
setattr(self, i.lower(), ser)
self.connected.append(i)
logging.warning('Waiting for port initialization')
time.sleep(1.5) # REVIEW[Jason] how to confirm how many seconds to wait
logging.debug('Connected Successfully '.ljust(60, '='))
# stop the connections of devices
def stop(self):
for i in self.connected:
ser = getattr(self, i.lower())
if ser.in_waiting:
ser.readall()
ser.close()
delattr(self, i.lower())
logging.debug('[{}]:Closed connection'.format(i))
self.connected = []
# get a connection
def get(self, device: str):
self._current = device
assert device.lower() in map(str.lower, self.connected), '[{}] connect failed'.format(device)
assert device.lower() in map(str.lower, self.devices), "[{}] doesn't exist".format(device)
return getattr(self, device.lower())
# show devices' information
@staticmethod
def info():
return json.dumps(get_ports_info(), ensure_ascii=False, sort_keys=False, indent=4, separators=(',', ':'))
# send command to com
def cmd(self, command: bytes):
ser = getattr(self, self._current.lower())
return ser.write(command)
# get message from com
def msg(self):
ser = getattr(self, self._current.lower())
return ser.readline()
# send self check signal to arduino module
def self_check(self, signal: bytes = b'nice to meet you', target: bytes = b'nice to meet you too'):
"""
send signal, accept result, and compare it to the target
"""
self_check_result = {}
logging.debug('Start self check '.ljust(60, '='))
for i in self.devices:
# parameters
authence = False
try_times = 0
if i in self.connected:
logging.debug('[{}]:Get connection'.format(i))
ser = getattr(self, i.lower())
logging.debug('[{}]:Start self check'.format(i))
while not authence and try_times < 3:
ser.write(signal)
line = ser.readline()
authence = True if line[0:-2] == target else False # line ends with '\r\n'
if not authence:
unit = 'time' if try_times == 0 else 'times'
try_times += 1
logging.debug('[{}]:Self check fail {} {}'.format(i, try_times, unit))
else:
logging.debug('[{}]:No connection'.format(i))
logging.debug('[{}]:Record results'.format(i))
self_check_result[i] = 'ready' if authence else 'error'
logging.info('Self check Result: {}'.format(self_check_result))
logging.debug('Self check Completed '.ljust(60, '='))
return self_check_result
# query com name with special message
def query_name(self, signal: bytes = b"what's your name", prefix: bytes = b'my name is '):
"""
query com name with special message
assume signal=b"what's your name" and prefix=b'my name is '
then b'my name if PH001' show the com name is 'PH001'
"""
modules = {}
logging.debug('Start query name '.ljust(60, '='))
for i in self.devices:
if i in self.connected:
logging.debug('[{}]:Get connection'.format(i))
ser = getattr(self, i.lower())
logging.debug('[{}]:Start query name'.format(i))
ser.write(signal)
line = ser.readline()
logging.debug('[{}]:Record results'.format(i))
length = len(prefix)
modules[i] = line[length:-2].decode('utf-8') if prefix in line else None
logging.info('Query name Result: {}'.format(modules))
logging.debug('Query name Completed '.ljust(60, '='))
return modules
if __name__ == '__main__':
cur = Device()
cur.start()
cur.self_check()
cur.query_name()
cur.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment