Skip to content

Instantly share code, notes, and snippets.

@Cadair
Last active January 19, 2024 17:02
Show Gist options
  • Save Cadair/1e0a555deac70938b099b60097acde72 to your computer and use it in GitHub Desktop.
Save Cadair/1e0a555deac70938b099b60097acde72 to your computer and use it in GitHub Desktop.
Upload QSOs from WSJT-X to Cloudlog
#!/bin/env python3
"""
WSJT-X to Cloudlog QSO uploader.
This script reads QSO events from the WSJT UDP server and uploads the ADIF records to Cloudlog.
No non-standard library packages are required to run this.
usage: wsjt_cloudlog.py [-h] [--wsjt-port WSJT_PORT] [--wsjt-host WSJT_HOST] [--verbose] url api_key station_id
positional arguments:
url URL for CloudLog.
api_key CloudLog API key.
station_id CloudLog station ID to upload QSO to.
optional arguments:
-h, --help show this help message and exit
--wsjt-port WSJT_PORT
Port to listen for WSJT messages [default: 2237]
--wsjt-host WSJT_HOST
Host to listen for WSJT messages [default: localhost]
--verbose Output debugging information.
"""
import json
import logging
import socket
import struct
import sys
import urllib.request
from functools import partial
"""
WSJT UDP Parsing.
"""
MAGIC_NUMBER = 0xadbccbda # The WSJT magic number
def unpack_with_offset(format, data, offset):
"""
Unpack the format from the data and increment the offset
"""
delta_offset = struct.calcsize(format)
unpacked = struct.unpack_from(format, data, offset=offset)
if len(unpacked) == 1:
unpacked = unpacked[0]
return unpacked, offset + delta_offset
def unpack_wsjt_utf8(data, offset):
"""
The wireformat uses a utf8 sub-format which is a uint32 number followed by
that number of bytes of utf8 encoded string.
"""
n_bytes, offset = unpack_with_offset(">I", data, offset)
utf_bytes, offset = unpack_with_offset(f">{int(n_bytes)}s", data, offset)
return utf_bytes.decode('utf8'), offset
def parse_header(data, offset):
# First parse the magic number to verify the message is one we want to parse
try:
magic, _ = unpack_with_offset(">I", data, offset)
assert magic == MAGIC_NUMBER
except Exception:
log.exception("Unable to parse message in WSJT format")
return None, offset
header_format = ">III" # Note we have put the type in the header for dispatch parsing.
return unpack_with_offset(header_format, data, offset)
def parse_heartbeat(data, offset):
heartbeat_id, offset = unpack_wsjt_utf8(data, offset)
max_schema, offset = unpack_with_offset(">I", data, offset)
version, offset = unpack_wsjt_utf8(data, offset)
revision, offset = unpack_wsjt_utf8(data, offset)
return (heartbeat_id, max_schema, version, revision), offset
def parse_logged_adif(data, offset):
unique_id, offset = unpack_wsjt_utf8(data, offset)
adif_content, offset = unpack_wsjt_utf8(data, offset)
return (unique_id, adif_content), offset
def parse_wsjt_message(data, callbacks=None):
if not callbacks:
callbacks = {}
payload_functions = {
# Heartbeat
0: parse_heartbeat,
12: parse_logged_adif,
}
header, offset = parse_header(data, 0)
if header is None:
return
magic, schema, type_id = header
log.debug(f"Got message with type {type_id}")
if type_id in payload_functions:
payload, offset = payload_functions[type_id](data, offset)
log.debug("Decoded message: %s", payload)
if payload and type_id in callbacks:
callbacks[type_id](payload)
"""
Cloudlog
"""
def test_cloudlog(base_url):
"""
Check that we can make a request to the given cloudlog URL.
"""
response = urllib.request.urlopen(f"{base_url}/index.php/api/statistics")
assert response.code == 201
data = json.loads(response.read().decode())
if 'Today' not in data:
log.warning("Unknown response from Cloudlog %s. May not be connected correctly.", data)
return data
def upload_to_cloudlog(base_url, api_key, station_id, payload):
adif = payload[1]
# Split out the record from the header
adif = adif.split("<EOH>")[1].strip()
data = {
"key": api_key,
"station_profile_id": station_id,
"type":"adif",
"string": adif
}
jsondata = json.dumps(data).encode('utf-8')
req = urllib.request.Request(f"{base_url}/index.php/api/qso")
req.add_header('Content-Type', 'application/json; charset=utf-8')
try:
response = urllib.request.urlopen(req, jsondata)
log.info("Sent QSO to cloudlog at %s, got response %s", base_url, response.read().decode())
except Exception:
log.exception("Failed to send ADIF to cloudlog")
if __name__ == "__main__":
"""
Run the script
"""
# Parse the arguments
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('url', metavar='url', type=str,
help='URL for CloudLog.')
parser.add_argument('api_key', metavar='api_key', type=str,
help='CloudLog API key.')
parser.add_argument('station_id', metavar='station_id', type=str,
help='CloudLog station ID to upload QSO to.')
parser.add_argument('--wsjt-port', default=2237,
help='Port to listen for WSJT messages [default: 2237]')
parser.add_argument('--wsjt-host', default="localhost",
help='Host to listen for WSJT messages [default: localhost]')
parser.add_argument('--verbose', action="store_true",
help='Output debugging information.')
arguments = vars(parser.parse_args())
# Setup logger
log = logging.getLogger(__name__)
log.setLevel("INFO")
if arguments["verbose"]:
log.setLevel("DEBUG")
stream_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
# Test connection to cloudlog
try:
test_cloudlog(arguments['url'])
except Exception:
log.exception("Unable to connect to Cloudlog")
sys.exit(1)
log.info("Successfully tested connection to Cloudlog")
# Define functions which are called after certain types of messages are decoded
callbacks = {
0: lambda payload: log.info("Got heartbeat from %s version %s %s", payload[0], payload[2], payload[3]),
12: partial(upload_to_cloudlog, arguments["url"], arguments["api_key"], arguments["station_id"]),
}
# Listen for WSJT messges on UDP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
host, port = arguments["wsjt_host"], int(arguments["wsjt_port"])
s.bind((host, port))
log.info("listening for WSJT on %s port %s", host, port)
while True:
data, addr = s.recvfrom(1024)
parse_wsjt_message(data, callbacks)
# Data for testing
# example_data = {
# 0: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x06WSJT-X\x00\x00\x00\x03\x00\x00\x00\x052.5.2\x00\x00\x00\x00',
# 12: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x0c\x00\x00\x00\x06WSJT-X\x00\x00\x01C\n<adif_ver:5>3.1.0\n<programid:6>WSJT-X\n<EOH>\n<call:5>YO9HP <gridsquare:4>KN35 <mode:3>FT8 <rst_sent:3>-01 <rst_rcvd:3>-14 <qso_date:8>20211116 <time_on:6>141215 <qso_date_off:8>20211116 <time_off:6>141315 <band:3>15m <freq:9>21.075047 <station_callsign:6>MW7STJ <my_gridsquare:6>IO72XO <tx_pwr:1>6 <comment:8>Bungalow <EOR>',
# }
# parse_wsjt_message(example_data[12], callbacks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment