Last active
January 19, 2024 17:02
-
-
Save Cadair/1e0a555deac70938b099b60097acde72 to your computer and use it in GitHub Desktop.
Upload QSOs from WSJT-X to Cloudlog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
""" | |
WSJT-X to Cloudlog QSO uploader. | |
This script reads QSO events from the WSJT UDP server and uploads the ADIF records to Cloudlog. | |
No non-standard library packages are required to run this. | |
usage: wsjt_cloudlog.py [-h] [--wsjt-port WSJT_PORT] [--wsjt-host WSJT_HOST] [--verbose] url api_key station_id | |
positional arguments: | |
url URL for CloudLog. | |
api_key CloudLog API key. | |
station_id CloudLog station ID to upload QSO to. | |
optional arguments: | |
-h, --help show this help message and exit | |
--wsjt-port WSJT_PORT | |
Port to listen for WSJT messages [default: 2237] | |
--wsjt-host WSJT_HOST | |
Host to listen for WSJT messages [default: localhost] | |
--verbose Output debugging information. | |
""" | |
import json | |
import logging | |
import socket | |
import struct | |
import sys | |
import urllib.request | |
from functools import partial | |
""" | |
WSJT UDP Parsing. | |
""" | |
MAGIC_NUMBER = 0xadbccbda # The WSJT magic number | |
def unpack_with_offset(format, data, offset): | |
""" | |
Unpack the format from the data and increment the offset | |
""" | |
delta_offset = struct.calcsize(format) | |
unpacked = struct.unpack_from(format, data, offset=offset) | |
if len(unpacked) == 1: | |
unpacked = unpacked[0] | |
return unpacked, offset + delta_offset | |
def unpack_wsjt_utf8(data, offset): | |
""" | |
The wireformat uses a utf8 sub-format which is a uint32 number followed by | |
that number of bytes of utf8 encoded string. | |
""" | |
n_bytes, offset = unpack_with_offset(">I", data, offset) | |
utf_bytes, offset = unpack_with_offset(f">{int(n_bytes)}s", data, offset) | |
return utf_bytes.decode('utf8'), offset | |
def parse_header(data, offset): | |
# First parse the magic number to verify the message is one we want to parse | |
try: | |
magic, _ = unpack_with_offset(">I", data, offset) | |
assert magic == MAGIC_NUMBER | |
except Exception: | |
log.exception("Unable to parse message in WSJT format") | |
return None, offset | |
header_format = ">III" # Note we have put the type in the header for dispatch parsing. | |
return unpack_with_offset(header_format, data, offset) | |
def parse_heartbeat(data, offset): | |
heartbeat_id, offset = unpack_wsjt_utf8(data, offset) | |
max_schema, offset = unpack_with_offset(">I", data, offset) | |
version, offset = unpack_wsjt_utf8(data, offset) | |
revision, offset = unpack_wsjt_utf8(data, offset) | |
return (heartbeat_id, max_schema, version, revision), offset | |
def parse_logged_adif(data, offset): | |
unique_id, offset = unpack_wsjt_utf8(data, offset) | |
adif_content, offset = unpack_wsjt_utf8(data, offset) | |
return (unique_id, adif_content), offset | |
def parse_wsjt_message(data, callbacks=None): | |
if not callbacks: | |
callbacks = {} | |
payload_functions = { | |
# Heartbeat | |
0: parse_heartbeat, | |
12: parse_logged_adif, | |
} | |
header, offset = parse_header(data, 0) | |
if header is None: | |
return | |
magic, schema, type_id = header | |
log.debug(f"Got message with type {type_id}") | |
if type_id in payload_functions: | |
payload, offset = payload_functions[type_id](data, offset) | |
log.debug("Decoded message: %s", payload) | |
if payload and type_id in callbacks: | |
callbacks[type_id](payload) | |
""" | |
Cloudlog | |
""" | |
def test_cloudlog(base_url): | |
""" | |
Check that we can make a request to the given cloudlog URL. | |
""" | |
response = urllib.request.urlopen(f"{base_url}/index.php/api/statistics") | |
assert response.code == 201 | |
data = json.loads(response.read().decode()) | |
if 'Today' not in data: | |
log.warning("Unknown response from Cloudlog %s. May not be connected correctly.", data) | |
return data | |
def upload_to_cloudlog(base_url, api_key, station_id, payload): | |
adif = payload[1] | |
# Split out the record from the header | |
adif = adif.split("<EOH>")[1].strip() | |
data = { | |
"key": api_key, | |
"station_profile_id": station_id, | |
"type":"adif", | |
"string": adif | |
} | |
jsondata = json.dumps(data).encode('utf-8') | |
req = urllib.request.Request(f"{base_url}/index.php/api/qso") | |
req.add_header('Content-Type', 'application/json; charset=utf-8') | |
try: | |
response = urllib.request.urlopen(req, jsondata) | |
log.info("Sent QSO to cloudlog at %s, got response %s", base_url, response.read().decode()) | |
except Exception: | |
log.exception("Failed to send ADIF to cloudlog") | |
if __name__ == "__main__": | |
""" | |
Run the script | |
""" | |
# Parse the arguments | |
import argparse | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('url', metavar='url', type=str, | |
help='URL for CloudLog.') | |
parser.add_argument('api_key', metavar='api_key', type=str, | |
help='CloudLog API key.') | |
parser.add_argument('station_id', metavar='station_id', type=str, | |
help='CloudLog station ID to upload QSO to.') | |
parser.add_argument('--wsjt-port', default=2237, | |
help='Port to listen for WSJT messages [default: 2237]') | |
parser.add_argument('--wsjt-host', default="localhost", | |
help='Host to listen for WSJT messages [default: localhost]') | |
parser.add_argument('--verbose', action="store_true", | |
help='Output debugging information.') | |
arguments = vars(parser.parse_args()) | |
# Setup logger | |
log = logging.getLogger(__name__) | |
log.setLevel("INFO") | |
if arguments["verbose"]: | |
log.setLevel("DEBUG") | |
stream_handler = logging.StreamHandler(sys.stdout) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
stream_handler.setFormatter(formatter) | |
log.addHandler(stream_handler) | |
# Test connection to cloudlog | |
try: | |
test_cloudlog(arguments['url']) | |
except Exception: | |
log.exception("Unable to connect to Cloudlog") | |
sys.exit(1) | |
log.info("Successfully tested connection to Cloudlog") | |
# Define functions which are called after certain types of messages are decoded | |
callbacks = { | |
0: lambda payload: log.info("Got heartbeat from %s version %s %s", payload[0], payload[2], payload[3]), | |
12: partial(upload_to_cloudlog, arguments["url"], arguments["api_key"], arguments["station_id"]), | |
} | |
# Listen for WSJT messges on UDP | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
host, port = arguments["wsjt_host"], int(arguments["wsjt_port"]) | |
s.bind((host, port)) | |
log.info("listening for WSJT on %s port %s", host, port) | |
while True: | |
data, addr = s.recvfrom(1024) | |
parse_wsjt_message(data, callbacks) | |
# Data for testing | |
# example_data = { | |
# 0: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x06WSJT-X\x00\x00\x00\x03\x00\x00\x00\x052.5.2\x00\x00\x00\x00', | |
# 12: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x0c\x00\x00\x00\x06WSJT-X\x00\x00\x01C\n<adif_ver:5>3.1.0\n<programid:6>WSJT-X\n<EOH>\n<call:5>YO9HP <gridsquare:4>KN35 <mode:3>FT8 <rst_sent:3>-01 <rst_rcvd:3>-14 <qso_date:8>20211116 <time_on:6>141215 <qso_date_off:8>20211116 <time_off:6>141315 <band:3>15m <freq:9>21.075047 <station_callsign:6>MW7STJ <my_gridsquare:6>IO72XO <tx_pwr:1>6 <comment:8>Bungalow <EOR>', | |
# } | |
# parse_wsjt_message(example_data[12], callbacks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment