Created
March 20, 2024 05:59
-
-
Save sjlongland/b3a579d4c702b967884b7a648de08bd7 to your computer and use it in GitHub Desktop.
Dummy rigctld implementation for GPIO control of radio PTT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Dummy rigctld implementation for controlling a radio PTT signal via a GPIO | |
pin. | |
This is intended for devices like the NWDR UDRC-II and DRAWS boards, which | |
attach to the top of a Raspberry Pi 2/3/4/5 single-board computer and provide | |
control signals for PTT and COS along with an audio interface for digital mode | |
operation. | |
Many software packages support rigctld as a PTT mechanism, but not all support | |
GPIO control. This script mimics enough of the rigctld protocol to fool QSSTV | |
into thinking it's talking to rigctld, and thus allow it to control PTT via | |
the GPIO pin. | |
Usage (PTT on GPIO #23; as per UDRC-II mini-din6): | |
python3 dummyrigctld.py 23 | |
""" | |
# © 2024 Stuart Longland VK4MSL | |
# All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are met: | |
# | |
# 1. Redistributions of source code must retain the above copyright notice, | |
# this list of conditions and the following disclaimer. | |
# 2. Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" | |
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE | |
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
# POSSIBILITY OF SUCH DAMAGE. | |
import asyncio | |
import argparse | |
import logging | |
import weakref | |
import os.path | |
from functools import partial | |
class GPIOPin(object): | |
_SYSFS_PATH = "/sys/class/gpio" | |
_SYSFS_NAME = "gpio%d" | |
_EXPORT = os.path.join(_SYSFS_PATH, "export") | |
_UNEXPORT = os.path.join(_SYSFS_PATH, "unexport") | |
@staticmethod | |
def _read_from(file): | |
with open(file, "r") as f: | |
return f.read() | |
@staticmethod | |
def _write_to(file, data): | |
with open(file, "w") as f: | |
return f.write(data) | |
def __init__(self, nbr, direction="out", state=False): | |
self._nbr = nbr | |
self._direction = direction | |
self._init_state = state | |
self._path = os.path.join(self._SYSFS_PATH, self._SYSFS_NAME % nbr) | |
self._dir = os.path.join(self._path, "direction") | |
self._value = os.path.join(self._path, "value") | |
@property | |
def value(self): | |
return bool(int(self._read_from(self._value))) | |
@value.setter | |
def value(self, value): | |
self._write_to(self._value, "1" if value else "0") | |
async def setup(self): | |
self._write_to(self._EXPORT, str(self._nbr)) | |
# Wait for the device to be exported and become writable | |
while True: | |
try: | |
self._write_to(self._dir, self._direction) | |
break | |
except OSError: | |
await asyncio.sleep(0.5) | |
# Set initial state | |
self.state = self._init_state | |
def teardown(self): | |
self.value = False | |
self._write_to(self._UNEXPORT, str(self._nbr)) | |
class RigCtlServer(object): | |
ADDRESS = "127.0.0.1" | |
PORT = 4532 | |
def __init__( | |
self, gpio, address="127.0.0.1", port=4532, loop=None, log=None | |
): | |
if log is None: | |
log = logging.getLogger(self.__class__.__name__) | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
self._gpio = gpio | |
self._log = log | |
self._loop = loop | |
self._address = address | |
self._port = port | |
async def start(self): | |
try: | |
server = await self._loop.create_server( | |
self._on_connect_start, self._address, self._port | |
) | |
self._log.info( | |
"Server listening %s port %s", self._address, self._port | |
) | |
async with server: | |
await server.serve_forever() | |
except: | |
self._log.exception("Failed to open listening socket") | |
raise | |
def _on_connect_start(self): | |
return RigCtlClient(self) | |
def _exec(self, client, cmd, args): | |
if cmd in ("\\set_ptt", "T"): | |
self._set_ptt(bool(int(args))) | |
client.write("RPRT 0\n") | |
elif cmd == "\\get_ptt": | |
res = self._get_ptt() | |
client.write("%d\n" % (1 if res else 0)) | |
elif cmd == "\\get_powerstat": | |
# Always report powered on | |
client.write("1\n") | |
elif cmd == "\\chk_vfo": | |
# Client is asking us for VFO data | |
client._chk_vfo_executed = True | |
client.write("0\n") | |
elif cmd == "\\dump_state": | |
# Report state | |
client.write( | |
"1\n" # Protocol v1 | |
"6\n" # Model 6: Dummy No VFO | |
"0\n" # ITU region -- not used | |
"0 0 0 0 0 0 0\n" # end of rx range list | |
"0 0 0 0 0 0 0\n" # end of tx range list | |
"0 0\n" # end of tuning steps | |
"0 0\n" # end of filters | |
"0\n" # max_rit | |
"0\n" # max_xit | |
"0\n" # max_ifshift | |
"0\n" # announces | |
"0 0 0 0 0 0 0 0\n" # preamp | |
"0 0 0 0 0 0 0 0\n" # attenuator | |
"0x00000000\n" # has_get_func | |
"0x00000000\n" # has_set_func | |
"0x00000000\n" # has_get_level | |
"0x00000000\n" # has_set_level | |
"0x00000000\n" # has_get_parm | |
"0x00000000\n" # has_set_parm | |
) | |
if client._chk_vfo_executed: | |
client.write( | |
"vfo_opts=0x00000000\n" | |
"ptt_type=0x00000001\n" # PTT=RIG | |
"targetable_vfo=0x00000000\n" | |
"has_set_vfo=0\n" | |
"has_get_vfo=0\n" | |
"has_set_freq=0\n" | |
"has_get_freq=0\n" | |
"has_set_conf=0\n" | |
"has_get_conf=0\n" | |
"has_power2mW=0\n" | |
"has_mw2power=0\n" | |
"timeout=0\n" | |
"rig_model=6\n" | |
"rigctl_version=4.5.5\n" | |
"agc_levels=\n" | |
"done\n" | |
) | |
client.write("0\n") # OK | |
else: | |
raise ValueError("Unknown command %r (args %r)" % (cmd, args)) | |
def _set_ptt(self, state): | |
self._gpio.value = state | |
self._log.info("PTT <= %r", state) | |
def _get_ptt(self): | |
state = self._gpio.value | |
self._log.info("PTT => %r", state) | |
return state | |
class RigCtlClient(asyncio.Protocol): | |
def __init__(self, server): | |
self._server = weakref.ref(server) | |
self._transport = None | |
self._log = server._log.getChild("new") | |
self._loop = server._loop | |
self._rx_buffer = bytearray() | |
self._chk_vfo_executed = False | |
def connection_made(self, transport): | |
server = self._server() | |
if server is None: | |
transport.close() | |
return | |
(addr, port) = transport.get_extra_info("peername") | |
self._log = server._log.getChild( | |
"peer:%s#%d" | |
% ( | |
addr, | |
port, | |
) | |
) | |
self._log.info("Connected") | |
self._transport = transport | |
def data_received(self, data): | |
self._log.debug("Received %r", data) | |
self._rx_buffer += data | |
self._loop.call_soon(self._scan_buffer) | |
def _scan_buffer(self): | |
server = self._server() | |
if server is None: | |
self._transport.close() | |
return | |
# Rigctl is a line-oriented protocol, end of command is a newline. | |
try: | |
end = self._rx_buffer.index(b"\n") | |
except ValueError: | |
self._log.debug("Partial content: %r", bytes(self._rx_buffer)) | |
return | |
cmd = bytes(self._rx_buffer[:end]) | |
self._rx_buffer = self._rx_buffer[end + 1 :] | |
self._log.debug( | |
"Received %r, remainder: %r", cmd, bytes(self._rx_buffer) | |
) | |
if len(self._rx_buffer): | |
self._loop.call_soon(self._scan_buffer) | |
try: | |
cmd = cmd.decode("US-ASCII") | |
if " " in cmd: | |
(cmd, args) = cmd.split(" ", 1) | |
else: | |
(cmd, args) = (cmd, "") | |
self._log.info("Received %r args %r", cmd, args) | |
server._exec(self, cmd, args) | |
except: | |
self._log.exception("Failed to handle %r", cmd) | |
self._transport.close() | |
def connection_lost(self, exc): | |
self._log.info("Disconnected (exc=%r)", exc) | |
def write(self, data): | |
self._log.debug("Sending %r", data) | |
self._transport.write(data.encode("US-ASCII")) | |
if __name__ == "__main__": | |
async def main(): | |
ap = argparse.ArgumentParser( | |
description="Mock rigctld for controlling a radio via GPIO" | |
) | |
ap.add_argument( | |
"--log-level", | |
type=str, | |
choices=("debug", "info", "warning", "error"), | |
help="Debugging level", | |
default="info", | |
) | |
ap.add_argument( | |
"--address", | |
type=str, | |
help="Bind address for server", | |
default=RigCtlServer.ADDRESS, | |
) | |
ap.add_argument( | |
"--port", | |
type=int, | |
help="Bind port number for server", | |
default=RigCtlServer.PORT, | |
) | |
ap.add_argument("gpio", type=int, help="GPIO pin number for PTT") | |
args = ap.parse_args() | |
logging.basicConfig(level=args.log_level.upper()) | |
gpio = GPIOPin(args.gpio) | |
await gpio.setup() | |
try: | |
server = RigCtlServer( | |
gpio=gpio, address=args.address, port=args.port | |
) | |
await server.start() | |
finally: | |
gpio.teardown() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment