Last active
June 21, 2021 02:25
-
-
Save DaelonSuzuka/144ca060c514640f8b26f0149b6b4456 to your computer and use it in GitHub Desktop.
SerialDevice Stuff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin | |
from qt import * | |
class AT600ProIISignals(QObject): | |
capacitors = Signal(int) | |
inductors = Signal(int) | |
hiloz = Signal(int) | |
antenna = Signal(str) | |
forward = Signal(float) | |
forward_watts = Signal(float) | |
reverse = Signal(float) | |
reverse_watts = Signal(float) | |
match_quality = Signal(float) | |
swr = Signal(float) | |
frequency = Signal(int) | |
@property | |
def message_tree(self): | |
return { | |
"update": { | |
"relays": { | |
"capacitors": self.capacitors.emit, | |
"inductors": self.inductors.emit, | |
"hiloz": self.hiloz.emit, | |
}, | |
"antenna": self.antenna.emit, | |
"rf": { | |
"forward": self.forward.emit, | |
"forward_watts": self.forward_watts.emit, | |
"reverse": self.reverse.emit, | |
"reverse_watts": self.reverse_watts.emit, | |
"match_quality": self.match_quality.emit, | |
"swr": self.swr.emit, | |
"frequency": self.frequency.emit, | |
}, | |
}, | |
} | |
class AT600ProII(CommonMessagesMixin, SerialDevice): | |
profile_name = "AT-600ProII" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.signals = AT600ProIISignals() | |
self.message_tree.merge(self.signals.message_tree) | |
self.message_tree.merge(self.common_message_tree) | |
@property | |
def widget(self): | |
w = AT600ProIIWidget(self.title, self.guid) | |
return w | |
def full_tune(self): | |
"""Attempt to recall memories, full tune if none are found.""" | |
self.send('{"command":{"tune":"full"}}') | |
def memory_tune(self): | |
"""Attempt to recall memories, quit if none are found.""" | |
self.send('{"command":{"tune":"memory"}}') | |
def force_tune(self): | |
"""Full tune without attempting to recall memories""" | |
self.send('{"command":{"tune":"force"}}') | |
def select_antenna(self, antenna): | |
self.send('{"command":{"select_antenna":%s}}' % antenna) | |
def set_relays(self, caps=None, inds=None, z=None): | |
cmd = '{"command":{"set_relays":{' | |
if caps: | |
cmd += '"caps":%s' % (caps) | |
if inds: | |
cmd += '"inds":%s' % (inds) | |
if z: | |
cmd += '"z":%s' % (z) | |
cmd += "}}}" | |
self.send(cmd) | |
def set_threshold(self, threshold): | |
self.send('{"command":{"set_threshold":"%s"}}' % (threshold)) | |
def set_auto_mode(self, state): | |
self.send('{"command":{"set_auto":"%s"}}' % (state)) | |
def set_scale(self, scale): | |
self.send('{"command":{"set_scale":"%s"}}' % (scale)) | |
def relays_cup(self): | |
self.send('{"command":"cup"}') | |
def relays_cdn(self): | |
self.send('{"command":"cdn"}') | |
def relays_lup(self): | |
self.send('{"command":"lup"}') | |
def relays_ldn(self): | |
self.send('{"command":"ldn"}') | |
class AT600ProIIWidget(DeviceWidget): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
grid = QGridLayout() | |
grid.addWidget(Label("Under Construction...")) | |
self.setLayout(grid) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from qt import * | |
class DeviceWidget(QGroupBox): | |
def __init__(self, title, guid=""): | |
super().__init__(title) | |
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Fixed) | |
self.guid = guid | |
self.setStyleSheet("DeviceWidget { border: 1px solid grey;}") | |
self.create_widgets() | |
self.connect_signals() | |
self.build_layout() | |
def create_widgets(self): | |
pass | |
def connect_signals(self): | |
pass | |
def build_layout(self): | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DummySerial: | |
def __init__(self, port=None, timeout=0): | |
self.port = port | |
def open(self): | |
pass | |
def close(self): | |
pass | |
def write(self, string): | |
pass | |
def read(self, bytes=1): | |
return "" | |
@property | |
def in_waiting(self): | |
return 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JSONBuffer: | |
"""A utility class that helps keep track of an incoming JSON message. | |
""" | |
def __init__(self): | |
self.buffer = "" | |
self.depth = 0 | |
def reset(self): | |
self.buffer = "" | |
self.depth = 0 | |
def insert_char(self, c): | |
"""Insert a single char into the buffer. | |
""" | |
if c == "{": | |
self.depth += 1 | |
if self.depth > 0: | |
self.buffer += c | |
if c == "}": | |
self.depth -= 1 | |
def completed(self): | |
"""Check if the MessageBuffer contains a completed JSON message. | |
""" | |
if (self.depth == 0) and (len(self.buffer) > 0): | |
return True | |
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from .serial_device_base import SerialDeviceBase | |
from time import time | |
from qt import * | |
fake_guid = 0 | |
def get_fake_guid(): | |
global fake_guid | |
guid = fake_guid | |
fake_guid += 1 | |
return str(guid) | |
class CommonMessagesMixin: | |
@property | |
def common_message_tree(self): | |
return { | |
'update': { | |
'device_info': { | |
'name': lambda s: self.__setattr__('name', s), | |
'guid': lambda s: self.__setattr__('guid', s), | |
} | |
}, | |
'response': { | |
'ok': None, | |
'error': None | |
} | |
} | |
def ping(self): | |
self.send('{"request":{"ping":"%s"}}' % (self.msg_count)) | |
def handshake(self): | |
"""Request for the device to report its name and guid(serial number).""" | |
self.send('{"request":"device_info"}') | |
def locate(self): | |
"""Instruct the device to blink all LEDs for 5 seconds.""" | |
self.send('{"command":"locate"}') | |
class MessageTree(dict): | |
def merge(self, d): | |
for key in d: | |
if key in self: | |
self[key].update(d[key]) | |
else: | |
self[key] = d[key] | |
class SerialDevice(SerialDeviceBase): | |
def __init__(self, port=None, baud=9600, device=None): | |
self.port = port | |
self.baud = baud | |
self.name = "" | |
self.guid = get_fake_guid() | |
# a SerialDevice can be created based on an existing SerialDevice | |
if device: | |
self.port = device.port | |
self.baud = device.baud | |
self.name = device.name | |
self.guid = device.guid | |
super().__init__(port=self.port, baud=self.baud) | |
self.signals = None | |
self.time_created = time() | |
# message stuff | |
self.msg_count = 0 | |
self.message_tree = MessageTree() | |
self.msg_history = [] | |
@property | |
def title(self): | |
return f"{self.profile_name} ({self.port}) <{self.guid}>" | |
def process_message(self, msg, table): | |
# TODO: catch KeyErrors | |
for k in msg.keys(): | |
# does the table list an action? | |
if k in table.keys() and callable(table[k]): | |
table[k](msg[k]) | |
# can we go deeper? | |
elif isinstance(msg[k], dict): | |
if k in table.keys(): | |
self.process_message(msg[k], table[k]) | |
def recieve(self, string): | |
super().recieve(string) | |
try: | |
msg = json.loads(string) | |
self.process_message(msg, self.message_tree) | |
except json.decoder.JSONDecodeError as e: | |
self.log.warn("JSONDecodeError" + str(e)) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from queue import Queue | |
from .json_buffer import JSONBuffer | |
from serial import Serial, SerialException | |
from serial.tools.list_ports_common import ListPortInfo | |
from .dummy_serial import DummySerial | |
import logger | |
class SerialDeviceBase: | |
def __init__(self, port=None, baud=9600): | |
self.log = logger.get_logger(__name__, "DEBUG") | |
self.queue = Queue() | |
self.msg = JSONBuffer() | |
self.active = False | |
self.ser = None | |
self.baud = baud | |
if isinstance(port, ListPortInfo): | |
self.port = port.device | |
else: | |
self.port = port | |
if self.port: | |
self.open() | |
def open(self): | |
""" open the serial port and set the device to active """ | |
if self.port == "DummyPort": | |
self.ser = DummySerial() | |
self.active = True | |
else: | |
try: | |
self.ser = Serial(port=self.port, baudrate=self.baud, timeout=0) | |
self.active = True | |
except Exception as e: | |
self.log.warn("PermissionError" + str(e)) | |
def close(self): | |
""" close the serial port and set the device to inactive """ | |
if not self.active: | |
return | |
self.msg.reset() | |
self.ser.close() | |
self.active = False | |
def send(self, string): | |
""" add a string to the outbound queue """ | |
if not self.active: | |
return | |
self.log.debug(f"({self.port}:{self.__class__.__name__}) TX: {string}") | |
self.queue.put(string) | |
self.msg_count += 1 | |
def recieve(self, string): | |
""" do something when a complete string is captured in self.communicate() """ | |
self.log.debug(f"({self.port}:{self.__class__.__name__}) RX: {string}") | |
def communicate(self): | |
""" Handle comms with the serial port. Call this often, from an event loop or something. """ | |
if not self.active: | |
return | |
# serial transmit | |
try: | |
if not self.queue.empty(): | |
self.ser.write(self.queue.get().encode()) | |
except SerialException as e: | |
self.log.warn(e) | |
# serial recieve | |
try: | |
while self.ser.in_waiting: | |
self.msg.insert_char(self.ser.read(1).decode()) | |
if self.msg.completed(): | |
break | |
except UnicodeDecodeError as e: | |
# self.log.warn(e) | |
pass | |
# handle completed message | |
if self.msg.completed(): | |
msg = self.msg.buffer | |
self.recieve(msg) | |
self.msg.reset() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from .serial_device_base import SerialDeviceBase | |
from time import time | |
from qt import * | |
fake_guid = 0 | |
def get_fake_guid(): | |
global fake_guid | |
guid = fake_guid | |
fake_guid += 1 | |
return str(guid) | |
class CommonMessagesMixin: | |
@property | |
def common_message_tree(self): | |
return { | |
'update': { | |
'device_info': { | |
'name': lambda s: self.__setattr__('name', s), | |
'guid': lambda s: self.__setattr__('guid', s), | |
} | |
}, | |
'response': { | |
'ok': None, | |
'error': None | |
} | |
} | |
def ping(self): | |
self.send('{"request":{"ping":"%s"}}' % (self.msg_count)) | |
def handshake(self): | |
"""Request for the device to report its name and guid(serial number).""" | |
self.send('{"request":"device_info"}') | |
def locate(self): | |
"""Instruct the device to blink all LEDs for 5 seconds.""" | |
self.send('{"command":"locate"}') | |
class MessageTree(dict): | |
def merge(self, d): | |
for key in d: | |
if key in self: | |
self[key].update(d[key]) | |
else: | |
self[key] = d[key] | |
class SerialDevice(SerialDeviceBase): | |
def __init__(self, port=None, baud=9600, device=None): | |
self.port = port | |
self.baud = baud | |
self.name = "" | |
self.guid = get_fake_guid() | |
# a SerialDevice can be created based on an existing SerialDevice | |
if device: | |
self.port = device.port | |
self.baud = device.baud | |
self.name = device.name | |
self.guid = device.guid | |
super().__init__(port=self.port, baud=self.baud) | |
self.signals = None | |
self.time_created = time() | |
# message stuff | |
self.msg_count = 0 | |
self.message_tree = MessageTree() | |
self.msg_history = [] | |
@property | |
def title(self): | |
return f"{self.profile_name} ({self.port}) <{self.guid}>" | |
def process_message(self, msg, table): | |
# TODO: catch KeyErrors | |
for k in msg.keys(): | |
# does the table list an action? | |
if k in table.keys() and callable(table[k]): | |
table[k](msg[k]) | |
# can we go deeper? | |
elif isinstance(msg[k], dict): | |
if k in table.keys(): | |
self.process_message(msg[k], table[k]) | |
def recieve(self, string): | |
super().recieve(string) | |
try: | |
msg = json.loads(string) | |
self.process_message(msg, self.message_tree) | |
except json.decoder.JSONDecodeError as e: | |
self.log.warn("JSONDecodeError" + str(e)) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin | |
from qt import * | |
from functools import partial | |
import logger | |
log = logger.get_logger(__name__) | |
class SW4USignals(QObject): | |
antenna = Signal(int) | |
forward = Signal(float) | |
reverse = Signal(float) | |
frequency = Signal(int) | |
auto_mode = Signal(bool) | |
auto_table = Signal(dict) | |
@property | |
def message_tree(self): | |
return { | |
"update": { | |
"auto": self.auto_mode.emit, | |
"auto_table": self.auto_table.emit, | |
"antenna": self.antenna.emit, | |
"frequency": self.frequency.emit, | |
} | |
} | |
class SW4U(CommonMessagesMixin, SerialDevice): | |
profile_name = "SW-4U" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.signals = SW4USignals() | |
self.message_tree.merge(self.signals.message_tree) | |
self.message_tree.merge(self.common_message_tree) | |
# startup messages | |
self.read_antenna() | |
self.read_auto_table() | |
self.auto_mode("read") | |
@property | |
def widget(self): | |
w = SW4UWidget(self.title, self.guid) | |
# connect signals | |
self.signals.antenna.connect(w.ant_btns.select) | |
self.signals.frequency.connect(lambda s: w.rf_panel.frequency.setText(str(s))) | |
w.ant_btns.set_antenna.connect(lambda i: self.set_antenna(str(i))) | |
w.auto_btns.auto.clicked.connect(self.auto_enabled) | |
return w | |
def set_antenna(self, ant): | |
self.send('{"antenna":{"set":"%s"}}' % (ant)) | |
def read_antenna(self): | |
self.send('{"antenna":"read"}') | |
def auto_mode(self, command): | |
self.send('{"auto":{"mode":"%s"}}' % (command)) | |
def auto_enabled(self, state): | |
if state: | |
self.send('{"auto":{"mode":"on"}}') | |
else: | |
self.send('{"auto":{"mode":"off"}}') | |
def set_auto_table(self, band, ant): | |
self.send('{"auto":{"table":{"%s":"%s"}}}' % (band, ant)) | |
def read_auto_table(self): | |
self.send('{"auto":{"table":"read"}}') | |
class SW4UWidget(DeviceWidget): | |
def create_widgets(self): | |
self.ant_btns = AntennaButtons() | |
self.rf_panel = RFPanel() | |
self.auto_btns = AutoButtons() | |
def connect_signals(self): | |
pass | |
def build_layout(self): | |
grid = QGridLayout() | |
grid.setContentsMargins(10, 20, 10, 10) | |
grid.addWidget(self.rf_panel, 0, 0, 1, 2) | |
grid.addWidget(self.auto_btns, 0, 2, 1, 2) | |
grid.addWidget(self.ant_btns, 1, 0, 1, 3) | |
self.setLayout(grid) | |
def select(self, antenna): | |
for i, button in enumerate(self.ant_btns.buttons()): | |
if i == antenna: | |
button.setChecked(True) | |
class RFPanel(QGroupBox): | |
def __init__(self): | |
super().__init__("RF") | |
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed) | |
# widgets | |
self.frequency = label("?") | |
self.forward = label("?") | |
self.reverse = label("?") | |
# layout | |
grid = QGridLayout() | |
grid.addWidget(Label("Forward:"), 0, 0) | |
grid.addWidget(self.forward, 0, 1) | |
grid.addWidget(Label("Reverse:"), 0, 2) | |
grid.addWidget(self.reverse, 0, 3) | |
grid.addWidget(Label("Frequency:"), 0, 4) | |
grid.addWidget(self.frequency, 0, 5) | |
self.setLayout(grid) | |
class AntennaButtons(QGroupBox): | |
set_antenna = Signal(int) | |
def __init__(self): | |
super().__init__("Select Antenna") | |
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed) | |
self.group = QButtonGroup() | |
# widgets and layout | |
grid = QGridLayout() | |
for i, name in enumerate(["None", "One", "Two", "Three", "Four",]): | |
button = Button(name, clicked=partial(self.set_antenna.emit, i), checkable=True) | |
self.group.addButton(button) | |
grid.addWidget(button, 0, i) | |
self.setLayout(grid) | |
def select(self, antenna): | |
for i, button in enumerate(self.group.buttons()): | |
if i == antenna: | |
button.setChecked(True) | |
class AutoButtons(QGroupBox): | |
def __init__(self): | |
super().__init__("Auto Mode") | |
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed) | |
# self.table = [0 for i in range(10)] | |
# self.bands = [i for i in range(10)] | |
# self.ants = [i for i in range(5)] | |
# self.radio_buttons = [[None for a in self.ants] for b in self.bands] | |
# for band in self.bands: | |
# hbox = QHBoxLayout() | |
# hbox.addWidget(QLabel()) # spacing | |
# hbox.addWidget(label(str(band) + ": ")) | |
# for ant in self.ants: | |
# radio = radio_btn( | |
# str(ant), partial(parent.set_auto_table, str(band), str(ant)) | |
# ) | |
# self.radio_buttons[band][ant] = radio | |
# hbox.addWidget(radio) | |
# hbox.addWidget(QLabel()) # spacing | |
# grid.addWidget(group_box("", layout=hbox)) | |
# widgets | |
self.auto = Button("On", checkable=True) | |
self.edit = Button("Edit") | |
# layout | |
grid = QGridLayout() | |
grid.addWidget(self.auto, 0, 0) | |
grid.addWidget(self.edit, 0, 1) | |
grid.setColumnStretch(5, 1) | |
self.setLayout(grid) | |
def update(self, table): | |
for k, v in table.items(): | |
self.radio_buttons[int(k)][v].setChecked(True) | |
self.table = [table[k] for k in table.keys()] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from devices import SerialDevice, CommonMessagesMixin | |
from qt import * | |
class UnknownDevice(CommonMessagesMixin, SerialDevice): | |
profile_name = "no profile" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.message_tree.merge(self.common_message_tree) | |
self.handshake() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin | |
from qt import * | |
class VariableCapacitorSignals(QObject): | |
capacitors = Signal(int) | |
input = Signal(int) | |
output = Signal(int) | |
@property | |
def message_tree(self): | |
return { | |
"update": { | |
"relays": { | |
"capacitors": self.capacitors.emit, | |
"input": self.input.emit, | |
"output": self.output.emit, | |
} | |
} | |
} | |
class VariableCapacitor(CommonMessagesMixin, SerialDevice): | |
profile_name = "VariableCapacitor" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.signals = VariableCapacitorSignals() | |
self.message_tree.merge(self.signals.message_tree) | |
self.message_tree.merge(self.common_message_tree) | |
self.capacitors = 0 | |
self.signals.capacitors.connect(lambda i: self.__setattr__("capacitors", i)) | |
# startup messages | |
self.request_current_relays() | |
@property | |
def widget(self): | |
w = VariableCapacitorWidget(self.title, self.guid) | |
# connect signals | |
w.input_button.clicked.connect(self.set_input_relay) | |
w.output_button.clicked.connect(self.set_output_relay) | |
w.cup_btn.clicked.connect(self.relays_cup) | |
w.cdn_btn.clicked.connect(self.relays_cdn) | |
w.max_btn.clicked.connect(self.relays_max) | |
w.min_btn.clicked.connect(self.relays_min) | |
w.set_relays.connect(self.set_caps) | |
self.signals.capacitors.connect(lambda x: w.edit.setText(str(x))) | |
self.signals.input.connect(w.input_button.setChecked) | |
self.signals.output.connect(w.output_button.setChecked) | |
return w | |
def set_caps(self, value): | |
self.send('{"command":{"set_capacitors":%s}}' % (str(value))) | |
def relays_max(self): | |
self.send('{"command":{"set_capacitors":255}}') | |
def relays_min(self): | |
self.send('{"command":{"set_capacitors":0}}') | |
def relays_cup(self): | |
self.send('{"command":"cup"}') | |
def relays_cdn(self): | |
self.send('{"command":"cdn"}') | |
def request_current_relays(self): | |
self.send('{"request":"relays"}') | |
def set_input_relay(self, state): | |
self.send('{"relays":{"input":%s}}' % (int(state))) | |
def set_output_relay(self, state): | |
self.send('{"relays":{"output":%s}}' % (int(state))) | |
class VariableCapacitorWidget(DeviceWidget): | |
set_relays = Signal(int) | |
def create_widgets(self): | |
self.input_button = Button("Input", checkable=True) | |
self.output_button = Button("Output", checkable=True) | |
self.cup_btn = Button("CUP", autoRepeat=True) | |
self.cdn_btn = Button("CDN", autoRepeat=True) | |
self.max_btn = Button("Max") | |
self.min_btn = Button("Min") | |
self.edit = QLineEdit() | |
self.set_btn = Button("Set") | |
def connect_signals(self): | |
self.set_btn.clicked.connect(lambda: self.set_relays.emit(int(self.edit.text()))) | |
self.edit.returnPressed.connect(self.set_btn.clicked) | |
def build_layout(self): | |
hbox = QHBoxLayout() | |
hbox.addWidget(label("Capacitors: ")) | |
hbox.addWidget(self.edit) | |
hbox.addWidget(self.set_btn) | |
gbox = QGroupBox("", layout=hbox) | |
grid = QGridLayout() | |
grid.setContentsMargins(10, 20, 10, 10) | |
grid.addWidget(gbox, 0, 0, 2, 1) | |
grid.addWidget(self.cup_btn, 0, 1) | |
grid.addWidget(self.cdn_btn, 1, 1) | |
grid.addWidget(self.max_btn, 0, 2) | |
grid.addWidget(self.min_btn, 1, 2) | |
grid.addWidget(self.input_button, 0, 3) | |
grid.addWidget(self.output_button, 1, 3) | |
grid.setColumnStretch(5, 1) | |
self.setLayout(grid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment