Skip to content

Instantly share code, notes, and snippets.

@cnelson
Created December 20, 2015 18:00
Show Gist options
  • Save cnelson/4a566e3c13a46df1b91d to your computer and use it in GitHub Desktop.
Save cnelson/4a566e3c13a46df1b91d to your computer and use it in GitHub Desktop.
Display radar.weather.gov images on APA102c LEDs via /dev/spidev
#!/usr/bin/env python
"""Display radar.weather.gov images on APA102c LEDs via /dev/SpiDev"""
# stdlib
import argparse
import colorsys
import datetime
import cStringIO as StringIO
import os.path
import sys
import time
import urllib
# annoying stuff
import lxml.html
import numpy as np
from PIL import Image
# weird stuff
import click
import spidev
# TODO: Submit a patch to make click progress bars reusable without hax
import click._termui_impl
click._termui_impl._AFTER_BAR = click._termui_impl.AFTER_BAR
def radar_image_urls(radar_site_id):
"""Load base reflectivity radar images for a given ``radar_site_id`` from radar.weather.gov
Args:
radar_site_id (str): A site id. See ``list_sites()`` for the authorative list of valid ids.
Returns:
list: Absoulte URLs to the available images
Raises:
IOError: Unable to reach radar.weather.gov
ValueError: The ``radar_site_id`` provided was invalid.
"""
# This is the URL to get recent radar images from.
# The URLs are *actually documented* yay!
# http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm
base_url = 'http://radar.weather.gov/ridge/RadarImg/N0R/{0}/'.format(radar_site_id.upper())
response = urllib.urlopen(base_url)
if response.getcode() != 200:
raise ValueError('{0} in an invalid radar site id'.format(radar_site_id))
# parse the apache directory index, and extract all the image urls
dom = lxml.html.fromstring(response.read())
# make the links absolute
dom.make_links_absolute(base_url)
# loop through the links, finding gifs only
images = []
for element, attribute, link, pos in dom.iterlinks():
if element.tag == 'a' and attribute == 'href' and link.endswith('.gif'):
images.append(link)
# sort newest first
images = sorted(set(images), reverse=True)
return images
def histogram_gif(url):
"""GET an indexed color image via HTTP, parse it, and generate a histogram
Args:
url (str): The url to load, it must refer to an indexed color image
Returns:
tuple (data, colors):
data (list): Floats (0.0-1.0) normalized values for each color in the image
color (dict): The colored referced by ``data`` k = the index of ``data``; v = [R, G, B]
Raises:
ValueError: The url provided was invalid, or wasn't an indexed color image
"""
# load and parse our image
try:
img = Image.open(
StringIO.StringIO(
urllib.urlopen(url).read()
)
)
except IOError as exc:
raise ValueError('Unable to load/parse {0}: {1}'.format(url, exc))
if img.mode != 'P':
raise ValueError('{0} is not an indexed color image.'.format(url))
# stash the pallet
pal = img.getpalette()
# and which color is transparent
trans_index = img.info["transparency"]
# convert to numpy
img = np.array(img)
# remove all transparent pixels
img = img[img != trans_index]
# extract the colors this gif uses
lookup = {}
colors = np.unique(img.ravel())
for z in colors:
lookup[z] = pal[z*3:(z*3)+3]
# generate a histogram of the _used_ color indexes
data, bins = np.histogram(img, bins=range(max(colors)+2), normed=1)
# return the histogram of the indxes, and the colors they represent
return data, lookup
def datetime_from_url(url):
"""Extract the timestamp from a given url based on the file format described in:
http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm
Args:
url (str): A URL to a radar.weather.gov imgage
Returns:
datetime.datetime: The datetime (UTC) the image was created
"""
filename = os.path.basename(url)
station, udate, utime, imgtype = filename.split('_')
# TODO: Convert to localtime?
return datetime.datetime.strptime(udate+' '+utime, '%Y%m%d %H%M')
def hist_to_apa102c(data, colors, length):
"""Generate a 'stacked bar chart' from a histogram suitable for display
on an 1D APA102c LED strip
Returns:
list: [brightness, blue, green, red]*length
This list is suitable for sending directly to an APA102c
"""
frame = []
# convert our colors to HSV
hsv_colors = {}
for kk, vv in colors.items():
hsv_colors[kk] = colorsys.rgb_to_hsv(*[float(i)/sum(vv) for i in vv])
# now sort colors by hue, so our strip looks nice
for kk, vv in sorted(hsv_colors.items(), key=lambda x: x[1][0], reverse=True):
frame += ([255]+list(reversed(colors[kk]))) * max(1, int(data[kk] * length))
return frame
# TODO: Is there a better source for this list of sites?
def list_sites():
"""Extract the list of radar sites from the <option> tags on the page
Returns:
list: A list of strings describing the available sites.
"""
response = urllib.urlopen('http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm')
if response.getcode() != 200:
raise ValueError('Unable to list radar sites')
# parse the apache directory index, and extract all the image urls
dom = lxml.html.fromstring(response.read())
return [site.text for site in dom.get_element_by_id('bysite')]
if __name__ == "__main__":
# show sites and bounce if asked to
if '--sites' in sys.argv:
print "Radar Sites:"
for site in list_sites():
print "\t"+site
raise SystemExit
# parse the CLI args
parser = argparse.ArgumentParser(
description='Display radar.weather.gov images on APA102c LEDs via /dev/spidev',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'radar_site_id',
help='The radar site to load images from. Use --sites for a list of radar site ids'
)
parser.add_argument(
'leds',
type=int,
help='The number of LEDs connected to the SPI device'
)
parser.add_argument(
'--spi-bus',
'-sb',
type=int,
default=0,
help='The SPI bus to use'
)
parser.add_argument(
'--spi-device',
'-sd',
type=int,
default=0,
help='The SPI device to use'
)
parser.add_argument(
'--spi-speed',
'-ss',
type=int,
default=None,
help='The speed in hz for the SPI clock. Values are platform dependent.'
)
parser.add_argument(
'--delay',
'-d',
default=0.3,
type=float,
help='How long to delay between frames'
)
parser.add_argument(
'--refresh',
'-r',
default=900,
type=int,
help='How long (in seconds) between reloading radar images.'
)
# just here for the help, the hax above handles it
parser.add_argument(
'--sites',
default=False, action='store_true',
help="List available site ids. If specified, all other arguments are ignored."
)
args = parser.parse_args()
# make sure we can open the SPI device
try:
spi = spidev.SpiDev()
spi.open(args.spi_bus, args.spi_device)
if args.spi_speed is not None:
spi.max_speed_hz = args.spi_speed
except IOError as exc:
parser.error(
"Cannot open /dev/spidev{0}.{1}: {2}".format(
args.spi_bus,
args.spi_device,
exc
)
)
except Exception as exc:
print exc
raise SystemExit
# blank the strip
spi.xfer([0]*4+[255, 0, 0, 0]*args.leds)
# loop until interrupted
try:
while True:
# load iamges
try:
urls = radar_image_urls(args.radar_site_id)
except ValueError as exc:
parser.error(exc)
except IOError as exc:
parser.error('Unable to contact radar.weather.gov: {0}'.format(exc))
if len(urls) == 0:
parser.error('No radar images found for {0}'.format(args.radar_site_id))
# when did we last load the images
last_load = datetime.datetime.utcnow()
# load each image, and generate a 1d representation
frames = []
with click.progressbar(urls) as bar:
for url in bar:
when = datetime_from_url(url)
bar.label = 'Fetching {0}'.format(when)
bar.render_progress()
try:
data, colors = histogram_gif(url)
except ValueError:
# sometime images disappear if we happen to run while the site
# is updating. It's ok, just ignore the frame and move on
continue
frames.append(
(when, hist_to_apa102c(data, colors, args.leds))
)
# hax to make the progres bar not add a new line when finished
click._termui_impl.AFTER_BAR = ''
# push each frame to the LED strip
while True:
with click.progressbar(frames, show_eta=False) as bar:
for when, frame in bar:
bar.label = 'Displaying {0}'.format(when)
bar.render_progress()
spi.xfer([0]*4)
spi.xfer(list(frame))
time.sleep(args.delay)
# reload radar images when we were told to
if (datetime.datetime.utcnow() - last_load).seconds >= args.refresh:
# fix console
print
# reset progress bar
click._termui_impl.AFTER_BAR = click._termui_impl._AFTER_BAR
break
except KeyboardInterrupt:
spi.xfer([0]*4+[255, 0, 0, 0]*args.leds)
# fix the console
print
raise SystemExit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment