Skip to content

Instantly share code, notes, and snippets.

@DaelonSuzuka
Last active June 26, 2020 23:30
Show Gist options
  • Save DaelonSuzuka/342afd3460f5a67276fe9d9824335833 to your computer and use it in GitHub Desktop.
Save DaelonSuzuka/342afd3460f5a67276fe9d9824335833 to your computer and use it in GitHub Desktop.
profiles = {p.profile_name: p for p in UnknownDevice.__subclasses__()}
class DeviceManager:
def __init__(self):
log.info("Initializing DeviceManager...")
self.devices = {}
self.new_devices = []
self.last_scan_time = 0
self.ports = []
self.layout = DeviceBoxGrid()
def device_connected(self, port):
log.debug(f"New device connected at ({port.device}), enumerating...")
device = UnknownDevice(port)
device.handshake()
self.new_devices.append(device)
def device_disconnected(self, port):
log.debug(f"Existing device removed from ({port.device}), cleaning up...")
# look up guid of removed device
guid = 0
for s in list(self.devices.keys()):
if self.devices[s].port == port:
guid = self.devices[s].guid
if guid:
self.layout.remove_device(guid)
self.devices.pop(guid)
def scan(self):
new_ports = sorted(comports())
added = [p for p in new_ports if p not in self.ports]
for port in added:
self.device_connected(port)
removed = [p for p in self.ports if p not in new_ports]
for port in removed:
self.device_disconnected(port)
self.ports = new_ports
def update(self):
for device in self.new_devices:
if (time.time() - device.time_created) > 1:
device.close()
self.new_devices.remove(device)
log.debug(f"Enumeration failed on ({device.port_name})")
continue
device.communicate()
if device.guid:
if device.name in profiles.keys():
device.close()
new_device = profiles[device.name](device.port)
new_device.name = device.name
new_device.guid = device.guid
self.devices[new_device.guid] = new_device
self.new_devices.remove(device)
self.layout.add_device(new_device)
log.debug(f"Enumeration succeeded on ({new_device.port_name}), ")
guids = self.devices.keys()
for guid in guids:
self.devices[guid].communicate()
class SerialDevice:
def __init__(self, port=None):
self.msg = JSONBuffer()
self.queue = Queue()
self.ser = None
self.time_created = time.time()
self.port = port
self.port_name = port.device
self.active = False
self.message_table = {}
if self.port:
self.open()
def open(self):
try:
self.ser = Serial(self.port.device, timeout=0)
self.active = True
except PermissionError as e:
log.warn('PermissionError' + str(e))
def close(self):
if not self.active:
return
self.msg.reset()
self.ser.close()
self.active = False
def send(self, string):
if not self.active:
return
log.debug(f"({self.port.device}:{self.__class__.__name__}) TX: {string}")
self.queue.put(string)
def process_message(self, msg, table):
#TODO: catch KeyErrors
for k in msg.keys():
# does the table list an action?
if callable(table[k]):
table[k](msg[k])
# can we go deeper?
elif isinstance(msg[k], dict):
if k in table.keys():
self.process_message(msg[k], table[k])
def recieve(self, string):
log.debug(f"({self.port.device}:{self.__class__.__name__}) RX: {string}")
if self.message_table:
try:
msg = json.loads(string)
self.process_message(msg, self.message_table)
except json.decoder.JSONDecodeError as e:
log.warn('JSONDecodeError' + str(e))
def communicate(self):
if not self.active:
return
if not self.queue.empty():
self.ser.write(self.queue.get().encode())
while self.ser.in_waiting:
self.msg.insert_char(self.ser.read(1).decode())
if self.msg.completed():
break
if self.msg.completed():
msg = self.msg.buffer
self.recieve(msg)
self.msg.reset()
class UnknownDevice(SerialDevice):
profile_name = "no profile"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._msg_count = 0
self.name = ""
self.guid = ""
self.ready = False
self.message_table.update({
'device_info':{
'name': lambda s: self.__setattr__('name', s),
'serial': lambda s: self.__setattr__('guid', s),
}
})
@property
def msg_count(self):
temp = self._msg_count
self._msg_count += 1
return temp
def ping(self):
self.send('{"ping":"%s"}' % (self.msg_count))
def handshake(self):
self.send('{"request_device_info":"short"}')
@property
def title(self):
return f"{self.name} ({self.port.device})"
# return f"{self.name} ({self.port}:{self.__class__.__name__}) <{self.guid}>"
@property
def layout(self):
layout = QVBoxLayout()
label = QLabel("This device doesn't have any useful features right now.")
layout.addWidget(label)
return layout
class VariableCapacitor(UnknownDevice):
profile_name = "VariableCapacitor"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.label_capacitors = label("?")
self.bypass_button = toggle_btn("Bypass", lambda x: self.set_single_relay("bypass", x))
self.input_button = toggle_btn("Input", lambda x: self.set_single_relay("input", x))
self.output_button = toggle_btn("Output", lambda x: self.set_single_relay("output", x))
self.message_table.update({
'response': {
'ok': 0,
'relays': {
'capacitors': lambda s: self.label_capacitors.setText(str(s)),
'bypass': lambda s: self.bypass_button.setChecked(s),
'input': lambda s: self.input_button.setChecked(s),
'output': lambda s: self.output_button.setChecked(s),
}
},
'update':{
'relays': {
'capacitors': lambda s: self.label_capacitors.setText(str(s)),
'bypass': lambda s: self.bypass_button.setChecked(s),
'input': lambda s: self.input_button.setChecked(s),
'output': lambda s: self.output_button.setChecked(s),
}
}
})
self.check_relays()
def relays_cup(self):
self.send('{"relays":"cup"}')
def relays_cdn(self):
self.send('{"relays":"cdn"}')
def relays_reset(self):
self.send('{"relays":"reset"}')
def set_single_relay(self, relay, state):
if state:
self.send('{"relays":{"%s":"on"}}' % (relay))
else:
self.send('{"relays":{"%s":"off"}}' % (relay))
def check_relays(self):
self.send('{"relays":"read"}')
def request_device_info(self):
self.send('{"request_device_info":"full"}')
@property
def layout(self):
layout = QVBoxLayout()
relays_box = QVBoxLayout()
hbox = QHBoxLayout()
hbox.addWidget(label("Capacitors: "))
hbox.addWidget(self.label_capacitors)
hbox.addWidget(btn("LUP", self.relays_cup))
hbox.addWidget(btn("LDN", self.relays_cdn))
hbox.addWidget(btn("Reset", self.relays_reset))
relays_box.addLayout(hbox)
hbox = QHBoxLayout()
hbox.addWidget(self.bypass_button)
hbox.addWidget(self.input_button)
hbox.addWidget(self.output_button)
relays_box.addLayout(hbox)
layout.addWidget(QGroupBox("Relays", layout = relays_box))
return layout
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment