Created
December 20, 2015 18:00
-
-
Save cnelson/4a566e3c13a46df1b91d to your computer and use it in GitHub Desktop.
Display radar.weather.gov images on APA102c LEDs via /dev/spidev
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Display radar.weather.gov images on APA102c LEDs via /dev/SpiDev""" | |
# stdlib | |
import argparse | |
import colorsys | |
import datetime | |
import cStringIO as StringIO | |
import os.path | |
import sys | |
import time | |
import urllib | |
# annoying stuff | |
import lxml.html | |
import numpy as np | |
from PIL import Image | |
# weird stuff | |
import click | |
import spidev | |
# TODO: Submit a patch to make click progress bars reusable without hax | |
import click._termui_impl | |
click._termui_impl._AFTER_BAR = click._termui_impl.AFTER_BAR | |
def radar_image_urls(radar_site_id): | |
"""Load base reflectivity radar images for a given ``radar_site_id`` from radar.weather.gov | |
Args: | |
radar_site_id (str): A site id. See ``list_sites()`` for the authorative list of valid ids. | |
Returns: | |
list: Absoulte URLs to the available images | |
Raises: | |
IOError: Unable to reach radar.weather.gov | |
ValueError: The ``radar_site_id`` provided was invalid. | |
""" | |
# This is the URL to get recent radar images from. | |
# The URLs are *actually documented* yay! | |
# http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm | |
base_url = 'http://radar.weather.gov/ridge/RadarImg/N0R/{0}/'.format(radar_site_id.upper()) | |
response = urllib.urlopen(base_url) | |
if response.getcode() != 200: | |
raise ValueError('{0} in an invalid radar site id'.format(radar_site_id)) | |
# parse the apache directory index, and extract all the image urls | |
dom = lxml.html.fromstring(response.read()) | |
# make the links absolute | |
dom.make_links_absolute(base_url) | |
# loop through the links, finding gifs only | |
images = [] | |
for element, attribute, link, pos in dom.iterlinks(): | |
if element.tag == 'a' and attribute == 'href' and link.endswith('.gif'): | |
images.append(link) | |
# sort newest first | |
images = sorted(set(images), reverse=True) | |
return images | |
def histogram_gif(url): | |
"""GET an indexed color image via HTTP, parse it, and generate a histogram | |
Args: | |
url (str): The url to load, it must refer to an indexed color image | |
Returns: | |
tuple (data, colors): | |
data (list): Floats (0.0-1.0) normalized values for each color in the image | |
color (dict): The colored referced by ``data`` k = the index of ``data``; v = [R, G, B] | |
Raises: | |
ValueError: The url provided was invalid, or wasn't an indexed color image | |
""" | |
# load and parse our image | |
try: | |
img = Image.open( | |
StringIO.StringIO( | |
urllib.urlopen(url).read() | |
) | |
) | |
except IOError as exc: | |
raise ValueError('Unable to load/parse {0}: {1}'.format(url, exc)) | |
if img.mode != 'P': | |
raise ValueError('{0} is not an indexed color image.'.format(url)) | |
# stash the pallet | |
pal = img.getpalette() | |
# and which color is transparent | |
trans_index = img.info["transparency"] | |
# convert to numpy | |
img = np.array(img) | |
# remove all transparent pixels | |
img = img[img != trans_index] | |
# extract the colors this gif uses | |
lookup = {} | |
colors = np.unique(img.ravel()) | |
for z in colors: | |
lookup[z] = pal[z*3:(z*3)+3] | |
# generate a histogram of the _used_ color indexes | |
data, bins = np.histogram(img, bins=range(max(colors)+2), normed=1) | |
# return the histogram of the indxes, and the colors they represent | |
return data, lookup | |
def datetime_from_url(url): | |
"""Extract the timestamp from a given url based on the file format described in: | |
http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm | |
Args: | |
url (str): A URL to a radar.weather.gov imgage | |
Returns: | |
datetime.datetime: The datetime (UTC) the image was created | |
""" | |
filename = os.path.basename(url) | |
station, udate, utime, imgtype = filename.split('_') | |
# TODO: Convert to localtime? | |
return datetime.datetime.strptime(udate+' '+utime, '%Y%m%d %H%M') | |
def hist_to_apa102c(data, colors, length): | |
"""Generate a 'stacked bar chart' from a histogram suitable for display | |
on an 1D APA102c LED strip | |
Returns: | |
list: [brightness, blue, green, red]*length | |
This list is suitable for sending directly to an APA102c | |
""" | |
frame = [] | |
# convert our colors to HSV | |
hsv_colors = {} | |
for kk, vv in colors.items(): | |
hsv_colors[kk] = colorsys.rgb_to_hsv(*[float(i)/sum(vv) for i in vv]) | |
# now sort colors by hue, so our strip looks nice | |
for kk, vv in sorted(hsv_colors.items(), key=lambda x: x[1][0], reverse=True): | |
frame += ([255]+list(reversed(colors[kk]))) * max(1, int(data[kk] * length)) | |
return frame | |
# TODO: Is there a better source for this list of sites? | |
def list_sites(): | |
"""Extract the list of radar sites from the <option> tags on the page | |
Returns: | |
list: A list of strings describing the available sites. | |
""" | |
response = urllib.urlopen('http://www.srh.noaa.gov/jetstream/doppler/ridge_download.htm') | |
if response.getcode() != 200: | |
raise ValueError('Unable to list radar sites') | |
# parse the apache directory index, and extract all the image urls | |
dom = lxml.html.fromstring(response.read()) | |
return [site.text for site in dom.get_element_by_id('bysite')] | |
if __name__ == "__main__": | |
# show sites and bounce if asked to | |
if '--sites' in sys.argv: | |
print "Radar Sites:" | |
for site in list_sites(): | |
print "\t"+site | |
raise SystemExit | |
# parse the CLI args | |
parser = argparse.ArgumentParser( | |
description='Display radar.weather.gov images on APA102c LEDs via /dev/spidev', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument( | |
'radar_site_id', | |
help='The radar site to load images from. Use --sites for a list of radar site ids' | |
) | |
parser.add_argument( | |
'leds', | |
type=int, | |
help='The number of LEDs connected to the SPI device' | |
) | |
parser.add_argument( | |
'--spi-bus', | |
'-sb', | |
type=int, | |
default=0, | |
help='The SPI bus to use' | |
) | |
parser.add_argument( | |
'--spi-device', | |
'-sd', | |
type=int, | |
default=0, | |
help='The SPI device to use' | |
) | |
parser.add_argument( | |
'--spi-speed', | |
'-ss', | |
type=int, | |
default=None, | |
help='The speed in hz for the SPI clock. Values are platform dependent.' | |
) | |
parser.add_argument( | |
'--delay', | |
'-d', | |
default=0.3, | |
type=float, | |
help='How long to delay between frames' | |
) | |
parser.add_argument( | |
'--refresh', | |
'-r', | |
default=900, | |
type=int, | |
help='How long (in seconds) between reloading radar images.' | |
) | |
# just here for the help, the hax above handles it | |
parser.add_argument( | |
'--sites', | |
default=False, action='store_true', | |
help="List available site ids. If specified, all other arguments are ignored." | |
) | |
args = parser.parse_args() | |
# make sure we can open the SPI device | |
try: | |
spi = spidev.SpiDev() | |
spi.open(args.spi_bus, args.spi_device) | |
if args.spi_speed is not None: | |
spi.max_speed_hz = args.spi_speed | |
except IOError as exc: | |
parser.error( | |
"Cannot open /dev/spidev{0}.{1}: {2}".format( | |
args.spi_bus, | |
args.spi_device, | |
exc | |
) | |
) | |
except Exception as exc: | |
print exc | |
raise SystemExit | |
# blank the strip | |
spi.xfer([0]*4+[255, 0, 0, 0]*args.leds) | |
# loop until interrupted | |
try: | |
while True: | |
# load iamges | |
try: | |
urls = radar_image_urls(args.radar_site_id) | |
except ValueError as exc: | |
parser.error(exc) | |
except IOError as exc: | |
parser.error('Unable to contact radar.weather.gov: {0}'.format(exc)) | |
if len(urls) == 0: | |
parser.error('No radar images found for {0}'.format(args.radar_site_id)) | |
# when did we last load the images | |
last_load = datetime.datetime.utcnow() | |
# load each image, and generate a 1d representation | |
frames = [] | |
with click.progressbar(urls) as bar: | |
for url in bar: | |
when = datetime_from_url(url) | |
bar.label = 'Fetching {0}'.format(when) | |
bar.render_progress() | |
try: | |
data, colors = histogram_gif(url) | |
except ValueError: | |
# sometime images disappear if we happen to run while the site | |
# is updating. It's ok, just ignore the frame and move on | |
continue | |
frames.append( | |
(when, hist_to_apa102c(data, colors, args.leds)) | |
) | |
# hax to make the progres bar not add a new line when finished | |
click._termui_impl.AFTER_BAR = '' | |
# push each frame to the LED strip | |
while True: | |
with click.progressbar(frames, show_eta=False) as bar: | |
for when, frame in bar: | |
bar.label = 'Displaying {0}'.format(when) | |
bar.render_progress() | |
spi.xfer([0]*4) | |
spi.xfer(list(frame)) | |
time.sleep(args.delay) | |
# reload radar images when we were told to | |
if (datetime.datetime.utcnow() - last_load).seconds >= args.refresh: | |
# fix console | |
# reset progress bar | |
click._termui_impl.AFTER_BAR = click._termui_impl._AFTER_BAR | |
break | |
except KeyboardInterrupt: | |
spi.xfer([0]*4+[255, 0, 0, 0]*args.leds) | |
# fix the console | |
raise SystemExit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment