Last active
June 26, 2020 23:30
-
-
Save DaelonSuzuka/342afd3460f5a67276fe9d9824335833 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
profiles = {p.profile_name: p for p in UnknownDevice.__subclasses__()} | |
class DeviceManager: | |
def __init__(self): | |
log.info("Initializing DeviceManager...") | |
self.devices = {} | |
self.new_devices = [] | |
self.last_scan_time = 0 | |
self.ports = [] | |
self.layout = DeviceBoxGrid() | |
def device_connected(self, port): | |
log.debug(f"New device connected at ({port.device}), enumerating...") | |
device = UnknownDevice(port) | |
device.handshake() | |
self.new_devices.append(device) | |
def device_disconnected(self, port): | |
log.debug(f"Existing device removed from ({port.device}), cleaning up...") | |
# look up guid of removed device | |
guid = 0 | |
for s in list(self.devices.keys()): | |
if self.devices[s].port == port: | |
guid = self.devices[s].guid | |
if guid: | |
self.layout.remove_device(guid) | |
self.devices.pop(guid) | |
def scan(self): | |
new_ports = sorted(comports()) | |
added = [p for p in new_ports if p not in self.ports] | |
for port in added: | |
self.device_connected(port) | |
removed = [p for p in self.ports if p not in new_ports] | |
for port in removed: | |
self.device_disconnected(port) | |
self.ports = new_ports | |
def update(self): | |
for device in self.new_devices: | |
if (time.time() - device.time_created) > 1: | |
device.close() | |
self.new_devices.remove(device) | |
log.debug(f"Enumeration failed on ({device.port_name})") | |
continue | |
device.communicate() | |
if device.guid: | |
if device.name in profiles.keys(): | |
device.close() | |
new_device = profiles[device.name](device.port) | |
new_device.name = device.name | |
new_device.guid = device.guid | |
self.devices[new_device.guid] = new_device | |
self.new_devices.remove(device) | |
self.layout.add_device(new_device) | |
log.debug(f"Enumeration succeeded on ({new_device.port_name}), ") | |
guids = self.devices.keys() | |
for guid in guids: | |
self.devices[guid].communicate() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SerialDevice: | |
def __init__(self, port=None): | |
self.msg = JSONBuffer() | |
self.queue = Queue() | |
self.ser = None | |
self.time_created = time.time() | |
self.port = port | |
self.port_name = port.device | |
self.active = False | |
self.message_table = {} | |
if self.port: | |
self.open() | |
def open(self): | |
try: | |
self.ser = Serial(self.port.device, timeout=0) | |
self.active = True | |
except PermissionError as e: | |
log.warn('PermissionError' + str(e)) | |
def close(self): | |
if not self.active: | |
return | |
self.msg.reset() | |
self.ser.close() | |
self.active = False | |
def send(self, string): | |
if not self.active: | |
return | |
log.debug(f"({self.port.device}:{self.__class__.__name__}) TX: {string}") | |
self.queue.put(string) | |
def process_message(self, msg, table): | |
#TODO: catch KeyErrors | |
for k in msg.keys(): | |
# does the table list an action? | |
if callable(table[k]): | |
table[k](msg[k]) | |
# can we go deeper? | |
elif isinstance(msg[k], dict): | |
if k in table.keys(): | |
self.process_message(msg[k], table[k]) | |
def recieve(self, string): | |
log.debug(f"({self.port.device}:{self.__class__.__name__}) RX: {string}") | |
if self.message_table: | |
try: | |
msg = json.loads(string) | |
self.process_message(msg, self.message_table) | |
except json.decoder.JSONDecodeError as e: | |
log.warn('JSONDecodeError' + str(e)) | |
def communicate(self): | |
if not self.active: | |
return | |
if not self.queue.empty(): | |
self.ser.write(self.queue.get().encode()) | |
while self.ser.in_waiting: | |
self.msg.insert_char(self.ser.read(1).decode()) | |
if self.msg.completed(): | |
break | |
if self.msg.completed(): | |
msg = self.msg.buffer | |
self.recieve(msg) | |
self.msg.reset() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class UnknownDevice(SerialDevice): | |
profile_name = "no profile" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._msg_count = 0 | |
self.name = "" | |
self.guid = "" | |
self.ready = False | |
self.message_table.update({ | |
'device_info':{ | |
'name': lambda s: self.__setattr__('name', s), | |
'serial': lambda s: self.__setattr__('guid', s), | |
} | |
}) | |
@property | |
def msg_count(self): | |
temp = self._msg_count | |
self._msg_count += 1 | |
return temp | |
def ping(self): | |
self.send('{"ping":"%s"}' % (self.msg_count)) | |
def handshake(self): | |
self.send('{"request_device_info":"short"}') | |
@property | |
def title(self): | |
return f"{self.name} ({self.port.device})" | |
# return f"{self.name} ({self.port}:{self.__class__.__name__}) <{self.guid}>" | |
@property | |
def layout(self): | |
layout = QVBoxLayout() | |
label = QLabel("This device doesn't have any useful features right now.") | |
layout.addWidget(label) | |
return layout |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class VariableCapacitor(UnknownDevice): | |
profile_name = "VariableCapacitor" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.label_capacitors = label("?") | |
self.bypass_button = toggle_btn("Bypass", lambda x: self.set_single_relay("bypass", x)) | |
self.input_button = toggle_btn("Input", lambda x: self.set_single_relay("input", x)) | |
self.output_button = toggle_btn("Output", lambda x: self.set_single_relay("output", x)) | |
self.message_table.update({ | |
'response': { | |
'ok': 0, | |
'relays': { | |
'capacitors': lambda s: self.label_capacitors.setText(str(s)), | |
'bypass': lambda s: self.bypass_button.setChecked(s), | |
'input': lambda s: self.input_button.setChecked(s), | |
'output': lambda s: self.output_button.setChecked(s), | |
} | |
}, | |
'update':{ | |
'relays': { | |
'capacitors': lambda s: self.label_capacitors.setText(str(s)), | |
'bypass': lambda s: self.bypass_button.setChecked(s), | |
'input': lambda s: self.input_button.setChecked(s), | |
'output': lambda s: self.output_button.setChecked(s), | |
} | |
} | |
}) | |
self.check_relays() | |
def relays_cup(self): | |
self.send('{"relays":"cup"}') | |
def relays_cdn(self): | |
self.send('{"relays":"cdn"}') | |
def relays_reset(self): | |
self.send('{"relays":"reset"}') | |
def set_single_relay(self, relay, state): | |
if state: | |
self.send('{"relays":{"%s":"on"}}' % (relay)) | |
else: | |
self.send('{"relays":{"%s":"off"}}' % (relay)) | |
def check_relays(self): | |
self.send('{"relays":"read"}') | |
def request_device_info(self): | |
self.send('{"request_device_info":"full"}') | |
@property | |
def layout(self): | |
layout = QVBoxLayout() | |
relays_box = QVBoxLayout() | |
hbox = QHBoxLayout() | |
hbox.addWidget(label("Capacitors: ")) | |
hbox.addWidget(self.label_capacitors) | |
hbox.addWidget(btn("LUP", self.relays_cup)) | |
hbox.addWidget(btn("LDN", self.relays_cdn)) | |
hbox.addWidget(btn("Reset", self.relays_reset)) | |
relays_box.addLayout(hbox) | |
hbox = QHBoxLayout() | |
hbox.addWidget(self.bypass_button) | |
hbox.addWidget(self.input_button) | |
hbox.addWidget(self.output_button) | |
relays_box.addLayout(hbox) | |
layout.addWidget(QGroupBox("Relays", layout = relays_box)) | |
return layout |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment