Last active
September 22, 2018 15:57
-
-
Save jueti/b379eb6d8cdec0acfeccd8fa3b3d43cf to your computer and use it in GitHub Desktop.
[Snippet - tool class]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python 3 | |
# -*- coding: utf-8 -*- | |
""" | |
`Device` is a tool class to operate serial ports. | |
""" | |
import json | |
import logging | |
import time | |
from typing import List, Dict, Tuple | |
import serial.tools.list_ports | |
DEBUG = True | |
Ignore = ['COM1'] | |
if DEBUG: | |
logging.basicConfig(level=logging.INFO) | |
else: | |
logging.basicConfig(filename='device.log', filemode='w', level=logging.INFO) | |
# list all serial ports | |
def get_ports_info() -> List[Dict[str, str]]: | |
""" | |
list all serial ports. | |
you can format the output string literals using the follow code: | |
>>> import json | |
>>> json.dumps(get_ports_info(), ensure_ascii=False, sort_keys=False, indent=4, separators=(',', ':')) | |
... # doctest: +ELLIPSIS | |
'[...]' | |
:return: return port info with json structure: [{}, {}, ...] | |
""" | |
# parameters | |
ports_info = [] | |
attribute = ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', | |
'product', 'interface'] | |
# save com info with json structure | |
# serial.tools.list_ports.comports() returns: an iterable that yields ListPortInfo objects | |
for port_info in serial.tools.list_ports.comports(): | |
port_info = {key: getattr(port_info, key) for key in attribute} # change ListPortInfo object to Dict object | |
ports_info.append(port_info) | |
return ports_info | |
def sort_by_devices(items: str) -> str: | |
position = 0 | |
for item in reversed(items): | |
if item.isdigit(): | |
position -= 1 | |
else: | |
break | |
return items[position:] + items[:position] | |
def get_all_devices() -> Tuple[str]: | |
devices_all = [] | |
for port_info in serial.tools.list_ports.comports(): | |
device: str = port_info.device | |
devices_all.append(device) | |
devices_all = sorted(devices_all, key=sort_by_devices) | |
return tuple(devices_all) | |
def get_ignore_devices() -> Tuple[str]: | |
return tuple(Ignore) | |
class Device(object): | |
""" | |
Tool for operating serial ports. | |
Available tool: get, start, stop, cmd, msg, info | |
""" | |
# pre setup | |
_current = '' # current com. example: 'com1' or 'com2' | |
_ignore = [] # com which you want to ignore. example: ['com1', 'com2'] | |
def __init__(self): | |
self.devices = get_all_devices() | |
self.connected = list() # Keep connected devices. example: ['COM2', 'COM3'] | |
# start and keep the connections of devices | |
def start(self): | |
logging.debug('Start connect '.ljust(60, '=')) | |
for i in self.devices: | |
logging.debug('[{}]:Connect'.format(i)) | |
ser = serial.Serial(timeout=0.1) | |
ser.port = i | |
try: | |
ser.open() | |
except IOError as e: | |
logging.error('[{}]:{}'.format(i, e)) | |
continue | |
if ser.in_waiting: | |
logging.debug('[{}]:Invalid chars in the receive buffer'.format(i)) | |
logging.debug('[{}]:Clear receive buffer'.format(i)) | |
ser.readall() | |
logging.debug('[{}]:Keep connection'.format(i)) | |
setattr(self, i.lower(), ser) | |
self.connected.append(i) | |
logging.warning('Waiting for port initialization') | |
time.sleep(1.5) # REVIEW[Jason] how to confirm how many seconds to wait | |
logging.debug('Connected Successfully '.ljust(60, '=')) | |
# stop the connections of devices | |
def stop(self): | |
for i in self.connected: | |
ser = getattr(self, i.lower()) | |
if ser.in_waiting: | |
ser.readall() | |
ser.close() | |
delattr(self, i.lower()) | |
logging.debug('[{}]:Closed connection'.format(i)) | |
self.connected = [] | |
# get a connection | |
def get(self, device: str): | |
self._current = device | |
assert device.lower() in map(str.lower, self.connected), '[{}] connect failed'.format(device) | |
assert device.lower() in map(str.lower, self.devices), "[{}] doesn't exist".format(device) | |
return getattr(self, device.lower()) | |
# show devices' information | |
@staticmethod | |
def info(): | |
return json.dumps(get_ports_info(), ensure_ascii=False, sort_keys=False, indent=4, separators=(',', ':')) | |
# send command to com | |
def cmd(self, command: bytes): | |
ser = getattr(self, self._current.lower()) | |
return ser.write(command) | |
# get message from com | |
def msg(self): | |
ser = getattr(self, self._current.lower()) | |
return ser.readline() | |
# send self check signal to arduino module | |
def self_check(self, signal: bytes = b'nice to meet you', target: bytes = b'nice to meet you too'): | |
""" | |
send signal, accept result, and compare it to the target | |
""" | |
self_check_result = {} | |
logging.debug('Start self check '.ljust(60, '=')) | |
for i in self.devices: | |
# parameters | |
authence = False | |
try_times = 0 | |
if i in self.connected: | |
logging.debug('[{}]:Get connection'.format(i)) | |
ser = getattr(self, i.lower()) | |
logging.debug('[{}]:Start self check'.format(i)) | |
while not authence and try_times < 3: | |
ser.write(signal) | |
line = ser.readline() | |
authence = True if line[0:-2] == target else False # line ends with '\r\n' | |
if not authence: | |
unit = 'time' if try_times == 0 else 'times' | |
try_times += 1 | |
logging.debug('[{}]:Self check fail {} {}'.format(i, try_times, unit)) | |
else: | |
logging.debug('[{}]:No connection'.format(i)) | |
logging.debug('[{}]:Record results'.format(i)) | |
self_check_result[i] = 'ready' if authence else 'error' | |
logging.info('Self check Result: {}'.format(self_check_result)) | |
logging.debug('Self check Completed '.ljust(60, '=')) | |
return self_check_result | |
# query com name with special message | |
def query_name(self, signal: bytes = b"what's your name", prefix: bytes = b'my name is '): | |
""" | |
query com name with special message | |
assume signal=b"what's your name" and prefix=b'my name is ' | |
then b'my name if PH001' show the com name is 'PH001' | |
""" | |
modules = {} | |
logging.debug('Start query name '.ljust(60, '=')) | |
for i in self.devices: | |
if i in self.connected: | |
logging.debug('[{}]:Get connection'.format(i)) | |
ser = getattr(self, i.lower()) | |
logging.debug('[{}]:Start query name'.format(i)) | |
ser.write(signal) | |
line = ser.readline() | |
logging.debug('[{}]:Record results'.format(i)) | |
length = len(prefix) | |
modules[i] = line[length:-2].decode('utf-8') if prefix in line else None | |
logging.info('Query name Result: {}'.format(modules)) | |
logging.debug('Query name Completed '.ljust(60, '=')) | |
return modules | |
if __name__ == '__main__': | |
cur = Device() | |
cur.start() | |
cur.self_check() | |
cur.query_name() | |
cur.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment