Skip to content

Instantly share code, notes, and snippets.

@blhoward2
Forked from Cadair/wsjt_cloudlog.py
Last active July 9, 2025 04:07
Show Gist options
  • Select an option

  • Save blhoward2/396e05a7e653851862ca5d43350e1316 to your computer and use it in GitHub Desktop.

Select an option

Save blhoward2/396e05a7e653851862ca5d43350e1316 to your computer and use it in GitHub Desktop.
Upload QSOs from WSJT-X to Cloudlog
#!/bin/env python3
"""
WSJT-X to Cloudlog QSO uploader.
This script reads QSO events from the WSJT UDP server and uploads the ADIF records to Cloudlog.
No non-standard library packages are required to run this.
usage: wsjt_cloudlog.py [-h] [--wsjt-port WSJT_PORT] [--wsjt-host WSJT_HOST] [--verbose] url api_key station_id
positional arguments:
url URL for CloudLog.
api_key CloudLog API key.
station_id CloudLog station ID to upload QSO to.
optional arguments:
-h, --help show this help message and exit
--wsjt-port WSJT_PORT
Port to listen for WSJT messages [default: 2237]
--wsjt-host WSJT_HOST
Host to listen for WSJT messages [default: localhost]
--verbose Output debugging information.
"""
import json
import logging
import socket
import struct
import sys
import urllib.request
from functools import partial
from datetime import datetime, timedelta, timezone
from systemd.journal import JournalHandler
"""
WSJT UDP Parsing.
"""
MAGIC_NUMBER = 0xadbccbda # The WSJT magic number
def unpack_with_offset(format, data, offset):
"""
Unpack the format from the data and increment the offset
"""
delta_offset = struct.calcsize(format)
unpacked = struct.unpack_from(format, data, offset=offset)
if len(unpacked) == 1:
unpacked = unpacked[0]
return unpacked, offset + delta_offset
def unpack_wsjt_utf8(data, offset):
"""
The wireformat uses a utf8 sub-format which is a uint32 number followed by
that number of bytes of utf8 encoded string.
"""
n_bytes, offset = unpack_with_offset(">I", data, offset)
utf_bytes, offset = unpack_with_offset(f">{int(n_bytes)}s", data, offset)
return utf_bytes.decode('utf8'), offset
def safe_unpack_wsjt_utf8(data, offset):
"""Like unpack_wsjt_utf8, but returns (None, offset) if not enough data remains"""
if offset + 4 > len(data):
return None, offset
try:
return unpack_wsjt_utf8(data, offset)
except Exception:
return None, offset
def parse_header(data, offset):
# First parse the magic number to verify the message is one we want to parse
try:
magic, _ = unpack_with_offset(">I", data, offset)
assert magic == MAGIC_NUMBER
except Exception:
log.exception("Unable to parse message in WSJT format")
return None, offset
header_format = ">III" # Note we have put the type in the header for dispatch parsing.
return unpack_with_offset(header_format, data, offset)
def parse_heartbeat(data, offset):
try:
heartbeat_id, offset = unpack_wsjt_utf8(data, offset)
max_schema, offset = unpack_with_offset(">I", data, offset)
version, offset = unpack_wsjt_utf8(data, offset)
revision = None
if offset < len(data): # Only try to read revision if there's room
revision, offset = unpack_wsjt_utf8(data, offset)
return (heartbeat_id, max_schema, version, revision), offset
except Exception as e:
log.warning("Skipping malformed heartbeat: %s", e)
return None, offset
def parse_qso_logged(data, offset):
def read_qdatetime(data, offset):
"""
Read a WSJT-X QDateTime:
- qint64 julian_day
- quint32 milliseconds since midnight
- quint8 timespec (0=local, 1=UTC, 2=offset, 3=tz)
- optional qint32 offset if timespec == 2
"""
julian_day, offset = unpack_with_offset(">q", data, offset)
millis, offset = unpack_with_offset(">I", data, offset)
timespec, offset = unpack_with_offset(">B", data, offset)
if timespec == 2:
offset_utc, offset = unpack_with_offset(">i", data, offset)
# Convert Julian day to datetime
# Julian day 0 = 4713 BC, but datetime.min = year 1, so adjust
try:
base = datetime(4713, 1, 1) - timedelta(days=1) # Julian Day 0 starts at noon
dt = base + timedelta(days=julian_day, milliseconds=millis)
except Exception:
dt = datetime.utcfromtimestamp(0)
return (dt.strftime("%Y%m%d"), dt.strftime("%H%M%S")), offset
def read_qdatetime(data, offset):
"""
WSJT-X-style QDateTime parser:
- ignores julian_day (typically 1970-01-01 dummy)
- uses millis since midnight
- assumes current UTC date for qso_date
"""
_, offset = unpack_with_offset(">q", data, offset) # Skip julian_day
millis, offset = unpack_with_offset(">I", data, offset)
_, offset = unpack_with_offset(">B", data, offset) # Skip timespec
try:
dt = datetime.utcfromtimestamp(millis / 1000.0)
time_str = dt.strftime("%H%M%S")
except Exception:
time_str = "000000"
date_str = datetime.utcnow().strftime("%Y%m%d") # Use system clock
return (date_str, time_str), offset
record = {}
record["id"], offset = unpack_wsjt_utf8(data, offset)
(record["qso_date_off"], record["time_off"]), offset = read_qdatetime(data, offset)
record["dx_call"], offset = unpack_wsjt_utf8(data, offset)
record["dx_grid"], offset = unpack_wsjt_utf8(data, offset)
record["tx_freq"], offset = unpack_with_offset(">Q", data, offset)
record["mode"], offset = unpack_wsjt_utf8(data, offset)
record["rst_sent"], offset = unpack_wsjt_utf8(data, offset)
record["rst_rcvd"], offset = unpack_wsjt_utf8(data, offset)
record["tx_power"], offset = unpack_wsjt_utf8(data, offset)
record["comments"], offset = unpack_wsjt_utf8(data, offset)
record["name"], offset = unpack_wsjt_utf8(data, offset)
(record["qso_date"], record["time_on"]), offset = read_qdatetime(data, offset)
record["operator"], offset = unpack_wsjt_utf8(data, offset)
record["my_call"], offset = unpack_wsjt_utf8(data, offset)
record["my_grid"], offset = unpack_wsjt_utf8(data, offset)
record["exchange_sent"], offset = safe_unpack_wsjt_utf8(data, offset)
record["exchange_rcvd"], offset = safe_unpack_wsjt_utf8(data, offset)
record["prop_mode"], offset = safe_unpack_wsjt_utf8(data, offset)
return record, offset
def parse_wsjt_message(data, callbacks=None):
if not callbacks:
callbacks = {}
payload_functions = {
# Heartbeat
0: parse_heartbeat,
5: parse_qso_logged,
}
header, offset = parse_header(data, 0)
if header is None:
return
magic, schema, type_id = header
log.debug(f"Got message with type {type_id}")
log.debug(header)
log.debug(data)
if type_id in payload_functions:
payload, offset = payload_functions[type_id](data, offset)
log.debug("Decoded message: %s", payload)
if payload and type_id in callbacks:
callbacks[type_id](payload)
"""
Cloudlog
"""
def test_cloudlog(base_url):
"""
Check that we can make a request to the given cloudlog URL.
"""
response = urllib.request.urlopen(f"{base_url}/index.php/api/statistics")
assert response.code == 201
data = json.loads(response.read().decode())
if 'Today' not in data:
log.warning("Unknown response from Cloudlog %s. May not be connected correctly.", data)
return data
def convert_qso_to_adif(qso):
def adif_field(name, value):
if not value:
return ""
return f"<{name}:{len(value)}>{value} "
adif = (
"<adif_ver:5>3.1.0\n<programid:6>WSJT-X\n<EOH>\n" +
adif_field("call", qso["dx_call"]) +
adif_field("gridsquare", qso["dx_grid"]) +
adif_field("mode", qso["mode"]) +
adif_field("rst_sent", qso["rst_sent"]) +
adif_field("rst_rcvd", qso["rst_rcvd"]) +
adif_field("qso_date", qso["qso_date"]) +
adif_field("time_on", qso["time_on"]) +
adif_field("qso_date_off", qso["qso_date_off"]) +
adif_field("time_off", qso["time_off"]) +
adif_field("band", "") + # Optional: map from frequency
adif_field("freq", f"{qso['tx_freq'] / 1e6:.6f}") +
adif_field("station_callsign", qso["my_call"]) +
adif_field("my_gridsquare", qso["my_grid"]) +
adif_field("tx_pwr", qso["tx_power"]) +
adif_field("comment", qso["comments"]) +
adif_field("name", qso["name"]) +
adif_field("operator", qso["operator"]) +
adif_field("prop_mode", qso["prop_mode"]) +
"<EOR>\n"
)
return adif.strip()
def upload_logged_qso(base_url, api_key, station_id, qso):
adif = convert_qso_to_adif(qso)
data = {
"key": api_key,
"station_profile_id": station_id,
"type": "adif",
"string": adif
}
jsondata = json.dumps(data).encode('utf-8')
req = urllib.request.Request(f"{base_url}/index.php/api/qso")
req.add_header('Content-Type', 'application/json; charset=utf-8')
try:
logging.debug(f"Sending payload: {json.dumps(data)}")
response = urllib.request.urlopen(req, jsondata)
call = qso.get("dx_call", "UNKNOWN")
freq = qso.get("tx_freq", "UNKNOWN")
date = qso.get("qso_date", "UNKNOWN")
logging.info(f"Successfully uploaded QSO: {call} on {freq} Hz, date {date}.")
except urllib.error.HTTPError as e:
logging.error(f"HTTP {e.code} - {e.reason}")
logging.error(f"Response body: {e.read().decode()}")
if __name__ == "__main__":
"""
Run the script
"""
# Parse the arguments
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('url', metavar='url', type=str,
help='URL for CloudLog.')
parser.add_argument('api_key', metavar='api_key', type=str,
help='CloudLog API key.')
parser.add_argument('station_id', metavar='station_id', type=str,
help='CloudLog station ID to upload QSO to.')
parser.add_argument('--wsjt-port', default=2237,
help='Port to listen for WSJT messages [default: 2237]')
parser.add_argument('--wsjt-host', default="localhost",
help='Host to listen for WSJT messages [default: localhost]')
parser.add_argument('--verbose', action="store_true",
help='Output debugging information.')
arguments = vars(parser.parse_args())
# Setup logger
log = logging.getLogger()
log.addHandler(JournalHandler(SYSLOG_IDENTIFIER="wsjt_cloudlog"))
if arguments["verbose"]:
log.setLevel("DEBUG")
else:
log.setLevel("INFO")
# Test connection to cloudlog
try:
test_cloudlog(arguments['url'])
except Exception:
log.exception("Unable to connect to Cloudlog")
sys.exit(1)
log.info("Successfully tested connection to Cloudlog")
# Define functions which are called after certain types of messages are decoded
callbacks = {
5: partial(upload_logged_qso, arguments["url"], arguments["api_key"], arguments["station_id"]),
}
# Listen for WSJT messges on UDP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
host, port = arguments["wsjt_host"], int(arguments["wsjt_port"])
s.bind((host, port))
log.info("listening for WSJT on %s port %s", host, port)
while True:
data, addr = s.recvfrom(1024)
parse_wsjt_message(data, callbacks)
# Data for testing
# example_data = {
# 0: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x06WSJT-X\x00\x00\x00\x03\x00\x00\x00\x052.5.2\x00\x00\x00\x00',
# 12: b'\xad\xbc\xcb\xda\x00\x00\x00\x02\x00\x00\x00\x0c\x00\x00\x00\x06WSJT-X\x00\x00\x01C\n<adif_ver:5>3.1.0\n<programid:6>WSJT-X\n<EOH>\n<call:5>YO9HP <gridsquare:4>KN35 <mode:3>FT8 <rst_sent:3>-01 <rst_rcvd:3>-14 <qso_date:8>20211116 <time_on:6>141215 <qso_date_off:8>20211116 <time_off:6>141315 <band:3>15m <freq:9>21.075047 <station_callsign:6>MW7STJ <my_gridsquare:6>IO72XO <tx_pwr:1>6 <comment:8>Bungalow <EOR>',
# }
# parse_wsjt_message(example_data[12], callbacks)
[Unit]
Description=wsjt_cloudlog
After=network-online.target
Wants=network-online.target
[Service]
ExecStart=/[path]/wsjt_cloudlog.py --wsjt-host 224.0.0.1 --wsjt-port 2237 http://192.169.0.123:8086 [api key] 1
Restart=on-failure
[Install]
WantedBy=default.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment