Last active
January 3, 2022 03:46
-
-
Save kk7ds/bfc3c14f758274217cd7528ec4f6168c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Copyright 2021 Dan Smith <[email protected]> | |
# | |
# APRS NWS alert bulletin agent | |
# | |
# This attempts to provide one APRS bulletin message per execution for | |
# any NWS alerts active for the given area and zone(s). The expected | |
# usage is as an exec beacon for aprx, scheduled every ten minutes or | |
# so. Each new NWS alert will be emitted once as a bulletin until it | |
# is changed or updated. | |
import argparse | |
import collections | |
import datetime | |
import iso8601 | |
import json | |
import logging | |
import pytz | |
import re | |
import requests | |
import sys | |
UTC = pytz.utc | |
LOG = logging.getLogger('wxbeacon') | |
# Default state, if none is loaded from disk. | |
DEFAULT_STATE = { | |
'timestamp': datetime.datetime.now(tz=UTC).timestamp(), | |
'_nostate': True, | |
} | |
# Substitutions we use to shorten the bulletin text so it fits under | |
# the limit. We apply these in order until the text is short enough. | |
COMPRESS = [ | |
('...', ','), | |
('FEET', 'FT'), | |
('THROUGH', 'THRU'), | |
('EFFECT', 'EFF'), | |
('EVENING', 'EVE'), | |
('AFTERNOON', 'AFTN'), | |
('MORNING', 'MORN'), | |
('MIDNIGHT', '12AM'), | |
('NIGHT', 'NITE'), | |
('EVENING', 'EVE'), | |
('HAZARDOUS', 'HAZ'), | |
('COASTAL', 'COAST'), | |
('HIGH', 'HI'), | |
('CONDITIONS', 'CONDX'), | |
('WINTER', 'WNTR'), | |
('ADVISORY', 'ADVIS'), | |
('WARNING', 'WARN'), | |
('UNTIL', 'UNTL'), | |
('REMAINS', 'REM'), | |
('FROM', 'FRM'), | |
('ABOVE', 'ABV'), | |
] | |
DAYS = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', | |
'THURSDAY', 'FRIDAY', 'SATURDAY'] | |
class NOAAFailure(Exception): | |
pass | |
class State(collections.UserDict): | |
def __init__(self): | |
super(State, self).__init__(DEFAULT_STATE) | |
self.statefile = '/tmp/wxbeacon.state' | |
def load(self, statefile): | |
try: | |
self.statefile = statefile | |
with open(statefile) as f: | |
self.data = json.loads(f.read()) | |
except FileNotFoundError: | |
LOG.warning('No statefile, assuming current') | |
except json.decoder.JSONDecodeError as e: | |
LOG.warning('Failed to parse statefile: %s' % e) | |
except Exception as e: | |
LOG.exception('Failed to read statefile: %s' % e) | |
raise | |
def save(self, statefile=None): | |
if not statefile: | |
statefile = self.statefile | |
LOG.debug('Writing state to %s' % statefile) | |
try: | |
with open(statefile, 'w') as f: | |
f.write(json.dumps(self.data)) | |
except Exception as e: | |
LOG.exception('Failed to write statefile: %s' % e) | |
raise | |
def get_alerts(area, zones): | |
try: | |
resp = requests.get('https://api.weather.gov/alerts/active', | |
params={'area': area}) | |
resp.raise_for_status() | |
except requests.RequestException as e: | |
raise NOAAFailure('Unable to contact NOAA: %s' % e) | |
data = resp.json() | |
now = datetime.datetime.now().astimezone(UTC) | |
relevant_alerts = [] | |
for feature in data['features']: | |
properties = feature['properties'] | |
if properties['messageType'] not in ('Alert', 'Update'): | |
LOG.debug('Ignoring alert type %r' % properties['messageType']) | |
continue | |
if properties['status'] not in ('Actual',): | |
LOG.debug('Ignoring alert status %r' % properties['status']) | |
continue | |
expires = iso8601.parse_date(properties['expires']) | |
if expires < now: | |
LOG.debug('Ignoring alert expired at %s' % expires) | |
continue | |
if zones & set(properties['geocode']['UGC']): | |
LOG.info('Found relevant alert: %s (expires %s)' % ( | |
properties['event'], expires)) | |
relevant_alerts.append(feature) | |
else: | |
LOG.debug('Ignoring alert for %s' % ( | |
','.join(properties['geocode']['UGC']))) | |
return relevant_alerts | |
def make_bulletin(number, text): | |
LOG.debug('Original alert text: %r' % text) | |
group = 'WXA' | |
maxl = 67 | |
# Clean up some punctuation conventions | |
text = text.replace('~', '').replace('|', '').replace('...', ', ') | |
# Compress day names always - no reason to waste the bytes | |
for day in DAYS: | |
text = text.replace(day, day[:3]) | |
# Compress things like '4 AM' to '4AM' | |
text = re.sub(r'\s([0-9]+) ((A|P)M)\s', r' \1\2 ', text) | |
# Remove timezone references | |
text = re.sub(r'\s(P|C|E)(ST|DT)\s', ' ', text) | |
# While we're too long, apply compression substitutions in order | |
# until we are under the limit. They get more aggressive, so don't | |
# apply any we don't need. | |
compress = list(COMPRESS) | |
while compress and len(text) > maxl: | |
word, subst = compress.pop(0) | |
text = text.replace(word, subst) | |
LOG.debug('Shortened %r' % text) | |
return ':BLN%s%-5s:%s' % (number[0], group, text[:maxl]) | |
def update_state_from_alert_ts(ts, state): | |
dt = iso8601.parse_date(ts) | |
state['timestamp'] = dt.astimezone(UTC).timestamp() | |
LOG.debug('Updating state timestamp to %s' % dt) | |
def digest_alerts(state, alerts): | |
state.setdefault('alerts', {}) | |
current_ids = set([alert['properties']['id'] for alert in alerts]) | |
stored_ids = set(state['alerts'].keys()) | |
# Delete records for alerts we are tracking but that are no longer active | |
# FIXME: Should we keep this and beacon once as empty? | |
expired_numbers = {} | |
for expired in stored_ids - current_ids: | |
LOG.info('Expiring old alert %r with number %s' % ( | |
state['alerts'][expired]['headline'], | |
state['alerts'][expired]['number'])) | |
expired_numbers[expired] = state['alerts'][expired]['number'] | |
del state['alerts'][expired] | |
state.save() | |
# Figure out which bulletin numbers are being used and which we can assign | |
active_numbers = set([alert['number'] | |
for alert in state['alerts'].values()]) | |
all_numbers = set(str(x) for x in range(10)) | \ | |
set([chr(x) for x in range(ord('A'), ord('Z') + 1)]) | |
# Do not assign numbers we are already using, or things we just expired | |
# as they may be re-used by updates we have yet to parse | |
available_numbers = [str(x) for x in (all_numbers - active_numbers - | |
set(expired_numbers.values()))] | |
available_numbers.sort() | |
LOG.debug('Active bulletin numbers: %s', | |
','.join(str(x) for x in active_numbers)) | |
LOG.debug('Available bulletin numbers: %s', | |
','.join(str(x) for x in available_numbers)) | |
for alert in alerts: | |
alert_id = alert['properties']['id'] | |
if alert_id in state['alerts']: | |
LOG.debug('Active known alert %s has number %s' % ( | |
alert['properties']['parameters']['NWSheadline'][0], | |
state['alerts'][alert_id]['number'])) | |
continue | |
refs = set([ref['identifier'] | |
for ref in alert['properties']['references']]) | |
# Pick a persistent number for this alert, either one from a | |
# previously-referenced alert, or a new one from the available | |
# list | |
try: | |
prev_alerts = list(refs & set(expired_numbers.keys())) | |
if prev_alerts: | |
number = expired_numbers[prev_alerts[0]] | |
LOG.debug('Found update alert for previous bulletin %s;' | |
' reusing', number) | |
else: | |
number = available_numbers.pop(0) | |
except KeyError: | |
LOG.warning('No alert numbers available for %r' % ( | |
alert['properties']['parameters']['NWSheadline'][0])) | |
continue | |
try: | |
# Some alerts have no NWSheadline (Hydroloic Outlook, for | |
# example). There is nothing for us to send on those, so | |
# ignore. | |
if 'NWSheadline' not in alert['properties']['parameters']: | |
LOG.info('Alert %r has no headline; skipping', | |
alert['properties']['event']) | |
continue | |
headline = alert['properties']['parameters']['NWSheadline'][0] | |
# Sometimes NWS issues multiple alerts for an area with the same | |
# text. If this happens, do not digest the duplicates. | |
if headline in set(x['headline'] | |
for x in state['alerts'].values()): | |
LOG.info('Alert %r is a duplicate string; skipping', | |
headline) | |
continue | |
# Add new alert to our records | |
state['alerts'][alert_id] = { | |
'transmitted': 0, | |
'headline': headline, | |
'sent': alert['properties']['sent'], | |
'severity': alert['properties']['severity'], | |
'number': number, | |
} | |
except KeyError as e: | |
LOG.error('Missing key in alert: %s - %s' % ( | |
e, alert['properties'])) | |
continue | |
LOG.info('Tracking new alert %r sent at %s number %s' % ( | |
alert['properties']['parameters']['NWSheadline'][0], | |
iso8601.parse_date(alert['properties']['sent']), | |
number)) | |
state.save() | |
def get_alert(state, area, zones, intervals): | |
"""Get an alert to beacon, if appropriate. | |
:param state: A State object loaded from file | |
:param area: The NWS area string (i.e. 'OR') | |
:param zones: A set of NWS zone strings to limit the alerts | |
:param intervals: A dict of severity:interval limits to decide how often | |
to re-emit alerts (may be empty). | |
:returns: A string representing the bulletin-formatted payload to emit, | |
or None if no alerts are due | |
""" | |
alerts = get_alerts(area, zones) | |
digest_alerts(state, alerts) | |
now = datetime.datetime.now(tz=UTC).timestamp() | |
most_in_need = sorted(state['alerts'].values(), | |
key=lambda a: a['transmitted']) | |
for alert in most_in_need: | |
interval = intervals.get(alert.get('severity', 'Minor')) | |
age = now - alert['transmitted'] | |
if alert['transmitted'] and (not interval or age < interval): | |
LOG.debug('%s alert %r number %s not due at %i < %s seconds', | |
alert['severity'], alert['headline'], alert['number'], | |
age, interval or 'forever') | |
continue | |
bln = make_bulletin(alert['number'], alert['headline']) | |
alert['transmitted'] = now | |
update_state_from_alert_ts(alert['sent'], state) | |
state.save() | |
LOG.info('Will beacon %r' % bln) | |
return bln | |
# Only beacon one alert at a time; be conservative over completeness | |
break | |
def print_alert(state, area, zones, intervals): | |
alert = get_alert(state, area, zones, intervals) | |
if alert: | |
print(alert) | |
else: | |
# aprx requires a blank line to avoid logging an error | |
print('') | |
def main(): | |
logging.basicConfig(level=logging.ERROR) | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--all', help='Assume all alerts are new', | |
action='store_true') | |
parser.add_argument('--area', help='NWS alert area', | |
default='OR') | |
parser.add_argument('--zones', help='Filter alerts for these zones', | |
default=['ORC005', 'ORZ006'], | |
nargs='*') | |
parser.add_argument('--statefile', help='Use alternate state file', | |
default='/tmp/wxbeacon.state') | |
parser.add_argument('--debug', help='Enable debug output', | |
action='store_true') | |
parser.add_argument('-v', '--verbose', help='Verbose logging output', | |
action='store_true') | |
parser.add_argument('--log', help='Verbose logging to this file', | |
default=None) | |
parser.add_argument('--nosave', help='Do not update state', | |
action='store_true') | |
parser.add_argument('--severe-interval', type=int, default=None, | |
help='Resend severe alerts every this many minutes') | |
parser.add_argument('--moderate-interval', type=int, default=None, | |
help='Resend moderate alerts every this many minutes') | |
args = parser.parse_args() | |
if args.log: | |
handler = logging.FileHandler(args.log) | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
handler.setFormatter(formatter) | |
LOG.addHandler(handler) | |
if args.debug: | |
logging.getLogger().setLevel(logging.DEBUG) | |
elif args.verbose: | |
logging.getLogger().setLevel(logging.INFO) | |
intervals = { | |
'Severe': args.severe_interval and 60 * args.severe_interval, | |
'Moderate': args.moderate_interval and 60 * args.moderate_interval, | |
} | |
state = State() | |
state.load(args.statefile) | |
if args.nosave: | |
state.statefile = '/dev/null' | |
if args.all: | |
for alert in state['alerts'].values(): | |
LOG.info('Ignoring stored threshold %s for %r number %s' % ( | |
datetime.datetime.fromtimestamp(alert['transmitted']), | |
alert['headline'], alert['number'])) | |
alert['transmitted'] = 0 | |
state['timestamp'] = 0 | |
print_alert(state, args.area, set(args.zones), intervals) | |
if state.pop('_nostate', None): | |
LOG.info('Writing state because it was missing') | |
state.save() | |
if __name__ == '__main__': | |
try: | |
main() | |
except NOAAFailure as e: | |
LOG.error(e) | |
sys.exit(1) | |
except Exception as e: | |
LOG.exception('Failed to run: %s' % e) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment