Skip to content

Instantly share code, notes, and snippets.

@DaelonSuzuka
Last active June 21, 2021 02:25
Show Gist options
  • Save DaelonSuzuka/144ca060c514640f8b26f0149b6b4456 to your computer and use it in GitHub Desktop.
Save DaelonSuzuka/144ca060c514640f8b26f0149b6b4456 to your computer and use it in GitHub Desktop.
SerialDevice Stuff
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin
from qt import *
class AT600ProIISignals(QObject):
capacitors = Signal(int)
inductors = Signal(int)
hiloz = Signal(int)
antenna = Signal(str)
forward = Signal(float)
forward_watts = Signal(float)
reverse = Signal(float)
reverse_watts = Signal(float)
match_quality = Signal(float)
swr = Signal(float)
frequency = Signal(int)
@property
def message_tree(self):
return {
"update": {
"relays": {
"capacitors": self.capacitors.emit,
"inductors": self.inductors.emit,
"hiloz": self.hiloz.emit,
},
"antenna": self.antenna.emit,
"rf": {
"forward": self.forward.emit,
"forward_watts": self.forward_watts.emit,
"reverse": self.reverse.emit,
"reverse_watts": self.reverse_watts.emit,
"match_quality": self.match_quality.emit,
"swr": self.swr.emit,
"frequency": self.frequency.emit,
},
},
}
class AT600ProII(CommonMessagesMixin, SerialDevice):
profile_name = "AT-600ProII"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signals = AT600ProIISignals()
self.message_tree.merge(self.signals.message_tree)
self.message_tree.merge(self.common_message_tree)
@property
def widget(self):
w = AT600ProIIWidget(self.title, self.guid)
return w
def full_tune(self):
"""Attempt to recall memories, full tune if none are found."""
self.send('{"command":{"tune":"full"}}')
def memory_tune(self):
"""Attempt to recall memories, quit if none are found."""
self.send('{"command":{"tune":"memory"}}')
def force_tune(self):
"""Full tune without attempting to recall memories"""
self.send('{"command":{"tune":"force"}}')
def select_antenna(self, antenna):
self.send('{"command":{"select_antenna":%s}}' % antenna)
def set_relays(self, caps=None, inds=None, z=None):
cmd = '{"command":{"set_relays":{'
if caps:
cmd += '"caps":%s' % (caps)
if inds:
cmd += '"inds":%s' % (inds)
if z:
cmd += '"z":%s' % (z)
cmd += "}}}"
self.send(cmd)
def set_threshold(self, threshold):
self.send('{"command":{"set_threshold":"%s"}}' % (threshold))
def set_auto_mode(self, state):
self.send('{"command":{"set_auto":"%s"}}' % (state))
def set_scale(self, scale):
self.send('{"command":{"set_scale":"%s"}}' % (scale))
def relays_cup(self):
self.send('{"command":"cup"}')
def relays_cdn(self):
self.send('{"command":"cdn"}')
def relays_lup(self):
self.send('{"command":"lup"}')
def relays_ldn(self):
self.send('{"command":"ldn"}')
class AT600ProIIWidget(DeviceWidget):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
grid = QGridLayout()
grid.addWidget(Label("Under Construction..."))
self.setLayout(grid)
from qt import *
class DeviceWidget(QGroupBox):
def __init__(self, title, guid=""):
super().__init__(title)
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Fixed)
self.guid = guid
self.setStyleSheet("DeviceWidget { border: 1px solid grey;}")
self.create_widgets()
self.connect_signals()
self.build_layout()
def create_widgets(self):
pass
def connect_signals(self):
pass
def build_layout(self):
pass
class DummySerial:
def __init__(self, port=None, timeout=0):
self.port = port
def open(self):
pass
def close(self):
pass
def write(self, string):
pass
def read(self, bytes=1):
return ""
@property
def in_waiting(self):
return 0
class JSONBuffer:
"""A utility class that helps keep track of an incoming JSON message.
"""
def __init__(self):
self.buffer = ""
self.depth = 0
def reset(self):
self.buffer = ""
self.depth = 0
def insert_char(self, c):
"""Insert a single char into the buffer.
"""
if c == "{":
self.depth += 1
if self.depth > 0:
self.buffer += c
if c == "}":
self.depth -= 1
def completed(self):
"""Check if the MessageBuffer contains a completed JSON message.
"""
if (self.depth == 0) and (len(self.buffer) > 0):
return True
return False
import json
from .serial_device_base import SerialDeviceBase
from time import time
from qt import *
fake_guid = 0
def get_fake_guid():
global fake_guid
guid = fake_guid
fake_guid += 1
return str(guid)
class CommonMessagesMixin:
@property
def common_message_tree(self):
return {
'update': {
'device_info': {
'name': lambda s: self.__setattr__('name', s),
'guid': lambda s: self.__setattr__('guid', s),
}
},
'response': {
'ok': None,
'error': None
}
}
def ping(self):
self.send('{"request":{"ping":"%s"}}' % (self.msg_count))
def handshake(self):
"""Request for the device to report its name and guid(serial number)."""
self.send('{"request":"device_info"}')
def locate(self):
"""Instruct the device to blink all LEDs for 5 seconds."""
self.send('{"command":"locate"}')
class MessageTree(dict):
def merge(self, d):
for key in d:
if key in self:
self[key].update(d[key])
else:
self[key] = d[key]
class SerialDevice(SerialDeviceBase):
def __init__(self, port=None, baud=9600, device=None):
self.port = port
self.baud = baud
self.name = ""
self.guid = get_fake_guid()
# a SerialDevice can be created based on an existing SerialDevice
if device:
self.port = device.port
self.baud = device.baud
self.name = device.name
self.guid = device.guid
super().__init__(port=self.port, baud=self.baud)
self.signals = None
self.time_created = time()
# message stuff
self.msg_count = 0
self.message_tree = MessageTree()
self.msg_history = []
@property
def title(self):
return f"{self.profile_name} ({self.port}) <{self.guid}>"
def process_message(self, msg, table):
# TODO: catch KeyErrors
for k in msg.keys():
# does the table list an action?
if k in table.keys() and callable(table[k]):
table[k](msg[k])
# can we go deeper?
elif isinstance(msg[k], dict):
if k in table.keys():
self.process_message(msg[k], table[k])
def recieve(self, string):
super().recieve(string)
try:
msg = json.loads(string)
self.process_message(msg, self.message_tree)
except json.decoder.JSONDecodeError as e:
self.log.warn("JSONDecodeError" + str(e))
from queue import Queue
from .json_buffer import JSONBuffer
from serial import Serial, SerialException
from serial.tools.list_ports_common import ListPortInfo
from .dummy_serial import DummySerial
import logger
class SerialDeviceBase:
def __init__(self, port=None, baud=9600):
self.log = logger.get_logger(__name__, "DEBUG")
self.queue = Queue()
self.msg = JSONBuffer()
self.active = False
self.ser = None
self.baud = baud
if isinstance(port, ListPortInfo):
self.port = port.device
else:
self.port = port
if self.port:
self.open()
def open(self):
""" open the serial port and set the device to active """
if self.port == "DummyPort":
self.ser = DummySerial()
self.active = True
else:
try:
self.ser = Serial(port=self.port, baudrate=self.baud, timeout=0)
self.active = True
except Exception as e:
self.log.warn("PermissionError" + str(e))
def close(self):
""" close the serial port and set the device to inactive """
if not self.active:
return
self.msg.reset()
self.ser.close()
self.active = False
def send(self, string):
""" add a string to the outbound queue """
if not self.active:
return
self.log.debug(f"({self.port}:{self.__class__.__name__}) TX: {string}")
self.queue.put(string)
self.msg_count += 1
def recieve(self, string):
""" do something when a complete string is captured in self.communicate() """
self.log.debug(f"({self.port}:{self.__class__.__name__}) RX: {string}")
def communicate(self):
""" Handle comms with the serial port. Call this often, from an event loop or something. """
if not self.active:
return
# serial transmit
try:
if not self.queue.empty():
self.ser.write(self.queue.get().encode())
except SerialException as e:
self.log.warn(e)
# serial recieve
try:
while self.ser.in_waiting:
self.msg.insert_char(self.ser.read(1).decode())
if self.msg.completed():
break
except UnicodeDecodeError as e:
# self.log.warn(e)
pass
# handle completed message
if self.msg.completed():
msg = self.msg.buffer
self.recieve(msg)
self.msg.reset()
import json
from .serial_device_base import SerialDeviceBase
from time import time
from qt import *
fake_guid = 0
def get_fake_guid():
global fake_guid
guid = fake_guid
fake_guid += 1
return str(guid)
class CommonMessagesMixin:
@property
def common_message_tree(self):
return {
'update': {
'device_info': {
'name': lambda s: self.__setattr__('name', s),
'guid': lambda s: self.__setattr__('guid', s),
}
},
'response': {
'ok': None,
'error': None
}
}
def ping(self):
self.send('{"request":{"ping":"%s"}}' % (self.msg_count))
def handshake(self):
"""Request for the device to report its name and guid(serial number)."""
self.send('{"request":"device_info"}')
def locate(self):
"""Instruct the device to blink all LEDs for 5 seconds."""
self.send('{"command":"locate"}')
class MessageTree(dict):
def merge(self, d):
for key in d:
if key in self:
self[key].update(d[key])
else:
self[key] = d[key]
class SerialDevice(SerialDeviceBase):
def __init__(self, port=None, baud=9600, device=None):
self.port = port
self.baud = baud
self.name = ""
self.guid = get_fake_guid()
# a SerialDevice can be created based on an existing SerialDevice
if device:
self.port = device.port
self.baud = device.baud
self.name = device.name
self.guid = device.guid
super().__init__(port=self.port, baud=self.baud)
self.signals = None
self.time_created = time()
# message stuff
self.msg_count = 0
self.message_tree = MessageTree()
self.msg_history = []
@property
def title(self):
return f"{self.profile_name} ({self.port}) <{self.guid}>"
def process_message(self, msg, table):
# TODO: catch KeyErrors
for k in msg.keys():
# does the table list an action?
if k in table.keys() and callable(table[k]):
table[k](msg[k])
# can we go deeper?
elif isinstance(msg[k], dict):
if k in table.keys():
self.process_message(msg[k], table[k])
def recieve(self, string):
super().recieve(string)
try:
msg = json.loads(string)
self.process_message(msg, self.message_tree)
except json.decoder.JSONDecodeError as e:
self.log.warn("JSONDecodeError" + str(e))
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin
from qt import *
from functools import partial
import logger
log = logger.get_logger(__name__)
class SW4USignals(QObject):
antenna = Signal(int)
forward = Signal(float)
reverse = Signal(float)
frequency = Signal(int)
auto_mode = Signal(bool)
auto_table = Signal(dict)
@property
def message_tree(self):
return {
"update": {
"auto": self.auto_mode.emit,
"auto_table": self.auto_table.emit,
"antenna": self.antenna.emit,
"frequency": self.frequency.emit,
}
}
class SW4U(CommonMessagesMixin, SerialDevice):
profile_name = "SW-4U"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signals = SW4USignals()
self.message_tree.merge(self.signals.message_tree)
self.message_tree.merge(self.common_message_tree)
# startup messages
self.read_antenna()
self.read_auto_table()
self.auto_mode("read")
@property
def widget(self):
w = SW4UWidget(self.title, self.guid)
# connect signals
self.signals.antenna.connect(w.ant_btns.select)
self.signals.frequency.connect(lambda s: w.rf_panel.frequency.setText(str(s)))
w.ant_btns.set_antenna.connect(lambda i: self.set_antenna(str(i)))
w.auto_btns.auto.clicked.connect(self.auto_enabled)
return w
def set_antenna(self, ant):
self.send('{"antenna":{"set":"%s"}}' % (ant))
def read_antenna(self):
self.send('{"antenna":"read"}')
def auto_mode(self, command):
self.send('{"auto":{"mode":"%s"}}' % (command))
def auto_enabled(self, state):
if state:
self.send('{"auto":{"mode":"on"}}')
else:
self.send('{"auto":{"mode":"off"}}')
def set_auto_table(self, band, ant):
self.send('{"auto":{"table":{"%s":"%s"}}}' % (band, ant))
def read_auto_table(self):
self.send('{"auto":{"table":"read"}}')
class SW4UWidget(DeviceWidget):
def create_widgets(self):
self.ant_btns = AntennaButtons()
self.rf_panel = RFPanel()
self.auto_btns = AutoButtons()
def connect_signals(self):
pass
def build_layout(self):
grid = QGridLayout()
grid.setContentsMargins(10, 20, 10, 10)
grid.addWidget(self.rf_panel, 0, 0, 1, 2)
grid.addWidget(self.auto_btns, 0, 2, 1, 2)
grid.addWidget(self.ant_btns, 1, 0, 1, 3)
self.setLayout(grid)
def select(self, antenna):
for i, button in enumerate(self.ant_btns.buttons()):
if i == antenna:
button.setChecked(True)
class RFPanel(QGroupBox):
def __init__(self):
super().__init__("RF")
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed)
# widgets
self.frequency = label("?")
self.forward = label("?")
self.reverse = label("?")
# layout
grid = QGridLayout()
grid.addWidget(Label("Forward:"), 0, 0)
grid.addWidget(self.forward, 0, 1)
grid.addWidget(Label("Reverse:"), 0, 2)
grid.addWidget(self.reverse, 0, 3)
grid.addWidget(Label("Frequency:"), 0, 4)
grid.addWidget(self.frequency, 0, 5)
self.setLayout(grid)
class AntennaButtons(QGroupBox):
set_antenna = Signal(int)
def __init__(self):
super().__init__("Select Antenna")
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed)
self.group = QButtonGroup()
# widgets and layout
grid = QGridLayout()
for i, name in enumerate(["None", "One", "Two", "Three", "Four",]):
button = Button(name, clicked=partial(self.set_antenna.emit, i), checkable=True)
self.group.addButton(button)
grid.addWidget(button, 0, i)
self.setLayout(grid)
def select(self, antenna):
for i, button in enumerate(self.group.buttons()):
if i == antenna:
button.setChecked(True)
class AutoButtons(QGroupBox):
def __init__(self):
super().__init__("Auto Mode")
self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed)
# self.table = [0 for i in range(10)]
# self.bands = [i for i in range(10)]
# self.ants = [i for i in range(5)]
# self.radio_buttons = [[None for a in self.ants] for b in self.bands]
# for band in self.bands:
# hbox = QHBoxLayout()
# hbox.addWidget(QLabel()) # spacing
# hbox.addWidget(label(str(band) + ": "))
# for ant in self.ants:
# radio = radio_btn(
# str(ant), partial(parent.set_auto_table, str(band), str(ant))
# )
# self.radio_buttons[band][ant] = radio
# hbox.addWidget(radio)
# hbox.addWidget(QLabel()) # spacing
# grid.addWidget(group_box("", layout=hbox))
# widgets
self.auto = Button("On", checkable=True)
self.edit = Button("Edit")
# layout
grid = QGridLayout()
grid.addWidget(self.auto, 0, 0)
grid.addWidget(self.edit, 0, 1)
grid.setColumnStretch(5, 1)
self.setLayout(grid)
def update(self, table):
for k, v in table.items():
self.radio_buttons[int(k)][v].setChecked(True)
self.table = [table[k] for k in table.keys()]
from devices import SerialDevice, CommonMessagesMixin
from qt import *
class UnknownDevice(CommonMessagesMixin, SerialDevice):
profile_name = "no profile"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.message_tree.merge(self.common_message_tree)
self.handshake()
from devices import SerialDevice, DeviceWidget, CommonMessagesMixin
from qt import *
class VariableCapacitorSignals(QObject):
capacitors = Signal(int)
input = Signal(int)
output = Signal(int)
@property
def message_tree(self):
return {
"update": {
"relays": {
"capacitors": self.capacitors.emit,
"input": self.input.emit,
"output": self.output.emit,
}
}
}
class VariableCapacitor(CommonMessagesMixin, SerialDevice):
profile_name = "VariableCapacitor"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signals = VariableCapacitorSignals()
self.message_tree.merge(self.signals.message_tree)
self.message_tree.merge(self.common_message_tree)
self.capacitors = 0
self.signals.capacitors.connect(lambda i: self.__setattr__("capacitors", i))
# startup messages
self.request_current_relays()
@property
def widget(self):
w = VariableCapacitorWidget(self.title, self.guid)
# connect signals
w.input_button.clicked.connect(self.set_input_relay)
w.output_button.clicked.connect(self.set_output_relay)
w.cup_btn.clicked.connect(self.relays_cup)
w.cdn_btn.clicked.connect(self.relays_cdn)
w.max_btn.clicked.connect(self.relays_max)
w.min_btn.clicked.connect(self.relays_min)
w.set_relays.connect(self.set_caps)
self.signals.capacitors.connect(lambda x: w.edit.setText(str(x)))
self.signals.input.connect(w.input_button.setChecked)
self.signals.output.connect(w.output_button.setChecked)
return w
def set_caps(self, value):
self.send('{"command":{"set_capacitors":%s}}' % (str(value)))
def relays_max(self):
self.send('{"command":{"set_capacitors":255}}')
def relays_min(self):
self.send('{"command":{"set_capacitors":0}}')
def relays_cup(self):
self.send('{"command":"cup"}')
def relays_cdn(self):
self.send('{"command":"cdn"}')
def request_current_relays(self):
self.send('{"request":"relays"}')
def set_input_relay(self, state):
self.send('{"relays":{"input":%s}}' % (int(state)))
def set_output_relay(self, state):
self.send('{"relays":{"output":%s}}' % (int(state)))
class VariableCapacitorWidget(DeviceWidget):
set_relays = Signal(int)
def create_widgets(self):
self.input_button = Button("Input", checkable=True)
self.output_button = Button("Output", checkable=True)
self.cup_btn = Button("CUP", autoRepeat=True)
self.cdn_btn = Button("CDN", autoRepeat=True)
self.max_btn = Button("Max")
self.min_btn = Button("Min")
self.edit = QLineEdit()
self.set_btn = Button("Set")
def connect_signals(self):
self.set_btn.clicked.connect(lambda: self.set_relays.emit(int(self.edit.text())))
self.edit.returnPressed.connect(self.set_btn.clicked)
def build_layout(self):
hbox = QHBoxLayout()
hbox.addWidget(label("Capacitors: "))
hbox.addWidget(self.edit)
hbox.addWidget(self.set_btn)
gbox = QGroupBox("", layout=hbox)
grid = QGridLayout()
grid.setContentsMargins(10, 20, 10, 10)
grid.addWidget(gbox, 0, 0, 2, 1)
grid.addWidget(self.cup_btn, 0, 1)
grid.addWidget(self.cdn_btn, 1, 1)
grid.addWidget(self.max_btn, 0, 2)
grid.addWidget(self.min_btn, 1, 2)
grid.addWidget(self.input_button, 0, 3)
grid.addWidget(self.output_button, 1, 3)
grid.setColumnStretch(5, 1)
self.setLayout(grid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment