Created
May 28, 2010 23:22
-
-
Save adamgreig/417883 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Modules from the Python standard library. | |
import datetime, math, sys, os, logging, calendar, optparse, json, subprocess | |
# We use Pydap from http://pydap.org/. | |
import pydap.exceptions, pydap.client, pydap.lib | |
# Output logger format | |
log = logging.getLogger('main') | |
console = logging.StreamHandler() | |
console.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) | |
log.addHandler(console) | |
progress_f = '' | |
progress = { | |
'gfs_percent': 0, | |
'gfs_timeremaining': '', | |
'gfs_complete': False, | |
'pred_running': False, | |
'pred_complete': False, | |
'progress_error': '', | |
} | |
def update_progress(**kwargs): | |
global progress_f | |
global progress | |
for arg in kwargs: | |
progress[arg] = kwargs[arg] | |
try: | |
progress_f.truncate(0) | |
progress_f.write(json.dumps([progress])) | |
progress_f.flush() | |
os.fsync(progress_f.fileno()) | |
except IOError: | |
global log | |
log.error('Could not update progress file') | |
def main(): | |
""" | |
The main program routine. | |
""" | |
# Set up our command line options | |
parser = optparse.OptionParser() | |
parser.add_option('-t', '--timestamp', dest='timestamp', | |
help='search for dataset covering the POSIX timestamp TIME \t[default: now]', | |
metavar='TIME', type='int', | |
default=calendar.timegm(datetime.datetime.utcnow().timetuple())) | |
parser.add_option('-o', '--output', dest='output', | |
help='file to write GFS data to with \'%(VAR)\' replaced with the the value of VAR [default: %default]', | |
metavar='FILE', | |
default='gfs/gfs_%(time)_%(lat)_%(lon)_%(latdelta)_%(londelta).dat') | |
parser.add_option('-v', '--verbose', action='count', dest='verbose', | |
help='be verbose. The more times this is specified the more verbose.', default=False) | |
parser.add_option('-p', '--past', dest='past', | |
help='window of time to save data is at most HOURS hours in past [default: %default]', | |
metavar='HOURS', | |
type='int', default=3) | |
parser.add_option('-f', '--future', dest='future', | |
help='window of time to save data is at most HOURS hours in future [default: %default]', | |
metavar='HOURS', | |
type='int', default=9) | |
parser.add_option('--hd', dest='hd', action="store_true", | |
help='use higher definition GFS data (default: no)') | |
parser.add_option('--preds', dest='preds_path', | |
help='path that contains uuid folders for predictions [default: %default]', | |
default='.', metavar='PATH') | |
parser.add_option('--predictor', dest='predictor', | |
help='path to predictor binary [default: %default]', | |
default='./pred', metavar='PATH') | |
group = optparse.OptionGroup(parser, "Location specifiers", | |
"Use these options to specify a particular tile of data to download.") | |
group.add_option('--lat', dest='lat', | |
help='tile centre latitude in range (-90,90) degrees north [default: %default]', | |
metavar='DEGREES', | |
type='float', default=52) | |
group.add_option('--lon', dest='lon', | |
help='tile centre longitude in degrees east [default: %default]', | |
metavar='DEGREES', | |
type='float', default=0) | |
group.add_option('--latdelta', dest='latdelta', | |
help='tile radius in latitude in degrees [default: %default]', | |
metavar='DEGREES', | |
type='float', default=5) | |
group.add_option('--londelta', dest='londelta', | |
help='tile radius in longitude in degrees [default: %default]', | |
metavar='DEGREES', | |
type='float', default=5) | |
parser.add_option_group(group) | |
#group = optparse.OptionGroup(parser, "Tile specifiers", | |
#"Use these options to specify how many tiles to download.") | |
#group.add_option('--lattiles', dest='lattiles', | |
#metavar='TILES', | |
#help='number of tiles along latitude to download [default: %default]', | |
#type='int', default=1) | |
#group.add_option('--lontiles', dest='lontiles', | |
#metavar='TILES', | |
#help='number of tiles along longitude to download [default: %default]', | |
#type='int', default=1) | |
#parser.add_option_group(group) | |
(options, args) = parser.parse_args() | |
# Check we got a UUID in the arguments | |
if len(args) != 1: | |
log.error('Exactly one positional argument should be supplied (uuid).') | |
sys.exit(1) | |
uuid = args[0] | |
uuid_path = options.preds_path + "/" + uuid + "/" | |
# Make the UUID directory if non existant | |
if not os.path.exists(uuid_path): | |
os.mkdir(uuid_path, 0770) | |
# Open the progress.json file for writing, creating it and closing again to flush | |
global progress_f | |
global progress | |
try: | |
progress_f = open(uuid_path+"progress.json", "w") | |
update_progress(gfs_percent=0) | |
except IOError: | |
log.error('Error opening progress.json file') | |
sys.exit(1) | |
# Check the predictor binary exists | |
if not os.path.exists(options.predictor): | |
log.error('Predictor binary does not exist.') | |
sys.exit(1) | |
# Check the latitude is in the right range. | |
if (options.lat < -90) | (options.lat > 90): | |
log.error('Latitude %s is outside of the range (-90,90).') | |
sys.exit(1) | |
# Check the delta sizes are valid. | |
if (options.latdelta <= 0.5) | (options.londelta <= 0.5): | |
log.error('Latitiude and longitude deltas must be at least 0.5 degrees.') | |
sys.exit(1) | |
if options.londelta > 180: | |
log.error('Longitude window sizes greater than 180 degrees are meaningless.') | |
sys.exit(1) | |
# We need to wrap the longitude into the right range. | |
options.lon = canonicalise_longitude(options.lon) | |
# How verbose are we being? | |
if options.verbose > 0: | |
log.setLevel(logging.INFO) | |
if options.verbose > 1: | |
log.setLevel(logging.DEBUG) | |
if options.verbose > 2: | |
logging.basicConfig(level=logging.INFO) | |
if options.verbose > 3: | |
logging.basicConfig(level=logging.DEBUG) | |
log.debug('Using cache directory: %s' % pydap.lib.CACHE) | |
timestamp_to_find = options.timestamp | |
time_to_find = datetime.datetime.utcfromtimestamp(timestamp_to_find) | |
log.info('Looking for latest dataset which covers %s' % time_to_find.ctime()) | |
try: | |
dataset = dataset_for_time(time_to_find, options.hd) | |
except: | |
print('Could not locate a dataset for the requested time.') | |
sys.exit(1) | |
dataset_times = map(timestamp_to_datetime, dataset.time) | |
dataset_timestamps = map(datetime_to_posix, dataset_times) | |
log.info('Found appropriate dataset:') | |
log.info(' Start time: %s (POSIX %s)' % \ | |
(dataset_times[0].ctime(), dataset_timestamps[0])) | |
log.info(' End time: %s (POSIX %s)' % \ | |
(dataset_times[-1].ctime(), dataset_timestamps[-1])) | |
log.info(' Latitude: %s -> %s' % (min(dataset.lat), max(dataset.lat))) | |
log.info(' Longitude: %s -> %s' % (min(dataset.lon), max(dataset.lon))) | |
update_progress(gfs_percent=5) | |
# for dlat in range(0,options.lattiles): | |
# for dlon in range(0,options.lontiles): | |
window = ( \ | |
options.lat + options.latdelta*2, options.latdelta, \ | |
options.lon + options.londelta*2, options.londelta) | |
write_file(options.output, dataset, \ | |
window, \ | |
time_to_find - datetime.timedelta(hours=options.past), \ | |
time_to_find + datetime.timedelta(hours=options.future)) | |
purge_cache() | |
update_progress(gfs_percent=100, gfs_timeremaining='Done', gfs_complete=True, pred_running=True) | |
subprocess.call([options.predictor, '-igfs', '-v', '-o'+uuid_path+'flight_path.csv', uuid_path+'scenario.ini']) | |
update_progress(pred_running=False, pred_complete=True) | |
def purge_cache(): | |
""" | |
Purge the pydap cache (if set). | |
""" | |
if pydap.lib.CACHE is None: | |
return | |
log.info('Purging PyDAP cache.') | |
for file in os.listdir(pydap.lib.CACHE): | |
log.debug(' Deleting %s.' % file) | |
os.remove(pydap.lib.CACHE + file) | |
def write_file(output_format, data, window, mintime, maxtime): | |
log.info('Downloading data in window (lat, lon) = (%s +/- %s, %s +/- %s).' % window) | |
# Firstly, get the hgtprs variable to extract the times we're going to use. | |
hgtprs_global = data['hgtprs'] | |
# Check the dimensions are what we expect. | |
assert(hgtprs_global.dimensions == ('time', 'lev', 'lat', 'lon')) | |
# Work out what times we want to download | |
times = filter(lambda x: (x >= mintime) & (x <= maxtime), | |
map(timestamp_to_datetime, hgtprs_global.maps['time'])) | |
num_times = len(times) | |
current_time = 0 | |
start_time = min(times) | |
end_time = max(times) | |
log.info('Downloading from %s to %s.' % (start_time.ctime(), end_time.ctime())) | |
# Filter the longitudes we're actually going to use. | |
longitudes = filter(lambda x: longitude_distance(x[1], window[2]) <= window[3] , | |
enumerate(hgtprs_global.maps['lon'])) | |
# Filter the latitudes we're actually going to use. | |
latitudes = filter(lambda x: math.fabs(x[1] - window[0]) <= window[1] , | |
enumerate(hgtprs_global.maps['lat'])) | |
update_progress(gfs_percent=10) | |
starttime = datetime.datetime.now() | |
# Write one file for each time index. | |
for timeidx, time in enumerate(hgtprs_global.maps['time']): | |
timestamp = datetime_to_posix(timestamp_to_datetime(time)) | |
if (timestamp < datetime_to_posix(start_time)) | (timestamp > datetime_to_posix(end_time)): | |
continue | |
current_time += 1 | |
log.info('Downloading data for %s.' % (timestamp_to_datetime(time).ctime())) | |
downloaded_data = { } | |
current_var = 0 | |
time_per_var = datetime.timedelta() | |
for var in ('hgtprs', 'ugrdprs', 'vgrdprs'): | |
current_var += 1 | |
grid = data[var] | |
log.info('Processing variable \'%s\' with shape %s...' % (var, grid.shape)) | |
# Check the dimensions are what we expect. | |
assert(grid.dimensions == ('time', 'lev', 'lat', 'lon')) | |
# See if the longitude ragion wraps... | |
if (longitudes[0][0] == 0) & (longitudes[-1][0] == hgtprs_global.maps['lat'].shape[0]-1): | |
# Download the data. Unfortunately, OpeNDAP only supports remote access of | |
# contiguous regions. Since the longitude data wraps, we may require two | |
# windows. The 'right' way to fix this is to download a 'left' and 'right' window | |
# and munge them together. In terms of download speed, however, the overhead of | |
# downloading is so great that it is quicker to download all the longitude | |
# data in our slice and do the munging afterwards. | |
selection = grid[\ | |
timeidx, | |
:, \ | |
latitudes[0][0]:(latitudes[-1][0]+1), | |
: ] | |
else: | |
selection = grid[\ | |
timeidx, | |
:, \ | |
latitudes[0][0]:(latitudes[-1][0]+1), | |
longitudes[0][0]:(longitudes[-1][0]+1) ] | |
# Cache the downloaded data for later | |
downloaded_data[var] = selection | |
log.info(' Downloaded data has shape %s...', selection.shape) | |
if len(selection.shape) != 3: | |
log.error(' Expected 3-d data.') | |
return | |
now = datetime.datetime.now() | |
time_elapsed = now - starttime | |
num_vars = (current_time - 1)*3 + current_var | |
time_per_var = time_elapsed / num_vars | |
total_time = num_times * 3 * time_per_var | |
time_left = total_time - time_elapsed | |
update_progress(gfs_percent=int( | |
10 + | |
(((current_time - 1) * 90) / num_times) + | |
((current_var * 90) / (3 * num_times)) | |
), gfs_timeremaining=time_left.__str__()) | |
# Check all the downloaded data has the same shape | |
target_shape = downloaded_data['hgtprs'] | |
assert( all( map( lambda x: x == target_shape, | |
filter( lambda x: x.shape, downloaded_data.itervalues() ) ) ) ) | |
log.info('Writing output...') | |
hgtprs = downloaded_data['hgtprs'] | |
ugrdprs = downloaded_data['ugrdprs'] | |
vgrdprs = downloaded_data['vgrdprs'] | |
log.debug('Using longitudes: %s' % (map(lambda x: x[1], longitudes),)) | |
output_filename = output_format | |
output_filename = output_filename.replace('%(time)', str(timestamp)) | |
output_filename = output_filename.replace('%(lat)', str(window[0])) | |
output_filename = output_filename.replace('%(latdelta)', str(window[1])) | |
output_filename = output_filename.replace('%(lon)', str(window[2])) | |
output_filename = output_filename.replace('%(londelta)', str(window[3])) | |
log.info(' Writing \'%s\'...' % output_filename) | |
output = open(output_filename, 'w') | |
# Write the header. | |
output.write('# window centre latitude, window latitude radius, window centre longitude, window longitude radius, POSIX timestamp\n') | |
header = window + (timestamp,) | |
output.write(','.join(map(str,header)) + '\n') | |
# Write the axis count. | |
output.write('# num_axes\n') | |
output.write('3\n') # FIXME: HARDCODED! | |
# Write each axis, a record showing the size and then one with the values. | |
output.write('# axis 1: pressures\n') | |
output.write(str(hgtprs.maps['lev'].shape[0]) + '\n') | |
output.write(','.join(map(str,hgtprs.maps['lev'][:])) + '\n') | |
output.write('# axis 2: latitudes\n') | |
output.write(str(len(latitudes)) + '\n') | |
output.write(','.join(map(lambda x: str(x[1]), latitudes)) + '\n') | |
output.write('# axis 3: longitudes\n') | |
output.write(str(len(longitudes)) + '\n') | |
output.write(','.join(map(lambda x: str(x[1]), longitudes)) + '\n') | |
# Write the number of lines of data. | |
output.write('# number of lines of data\n') | |
output.write('%s\n' % (hgtprs.shape[0] * len(latitudes) * len(longitudes))) | |
# Write the number of components in each data line. | |
output.write('# data line component count\n') | |
output.write('3\n') # FIXME: HARDCODED! | |
# Write the data itself. | |
output.write('# now the data in axis 3 major order\n') | |
output.write('# data is: ' | |
'geopotential height [gpm], u-component wind [m/s], ' | |
'v-component wind [m/s]\n') | |
for pressureidx, pressure in enumerate(hgtprs.maps['lev']): | |
for latidx, latitude in enumerate(hgtprs.maps['lat']): | |
for lonidx, longitude in enumerate(hgtprs.maps['lon']): | |
if longitude_distance(longitude, window[2]) > window[3]: | |
continue | |
record = ( hgtprs.array[pressureidx,latidx,lonidx], \ | |
ugrdprs.array[pressureidx,latidx,lonidx], \ | |
vgrdprs.array[pressureidx,latidx,lonidx] ) | |
output.write(','.join(map(str,record)) + '\n') | |
def canonicalise_longitude(lon): | |
""" | |
The GFS model has all longitudes in the range 0.0 -> 359.5. Canonicalise | |
a longitude so that it fits in this range and return it. | |
""" | |
lon = math.fmod(lon, 360) | |
if lon < 0.0: | |
lon += 360.0 | |
assert((lon >= 0.0) & (lon < 360.0)) | |
return lon | |
def longitude_distance(lona, lonb): | |
""" | |
Return the shortest distance in degrees between longitudes lona and lonb. | |
""" | |
distances = ( \ | |
math.fabs(lona - lonb), # Straightforward distance | |
360 - math.fabs(lona - lonb), # Other way 'round. | |
) | |
return min(distances) | |
def datetime_to_posix(time): | |
""" | |
Convert a datetime object to a POSIX timestamp. | |
""" | |
return calendar.timegm(time.timetuple()) | |
def timestamp_to_datetime(timestamp): | |
""" | |
Convert a GFS fractional timestamp into a datetime object representing | |
that time. | |
""" | |
# The GFS timestmp is a floating point number fo days from the epoch, | |
# day '0' appears to be January 1st 1 AD. | |
(fractional_day, integer_day) = math.modf(timestamp) | |
# Unfortunately, the datetime module uses a different epoch. | |
ordinal_day = int(integer_day - 1) | |
# Convert the integer day to a time and add the fractional day. | |
return datetime.datetime.fromordinal(ordinal_day) + \ | |
datetime.timedelta(days = fractional_day) | |
def possible_urls(time, hd): | |
""" | |
Given a datetime object representing a date and time, return a list of | |
possible data URLs which could cover that period. | |
The list is ordered from latest URL (i.e. most likely to be correct) to | |
earliest. | |
We assume that a particular data set covers a period of P days and | |
hence the earliest data set corresponds to time T - P and the latest | |
available corresponds to time T given target time T. | |
""" | |
period = datetime.timedelta(days = 7.5) | |
earliest = time - period | |
latest = time | |
if hd: | |
url_format = 'http://nomads.ncep.noaa.gov:9090/dods/gfs_hd/gfs_hd%i%02i%02i/gfs_hd_%02iz' | |
else: | |
url_format = 'http://nomads.ncep.noaa.gov:9090/dods/gfs/gfs%i%02i%02i/gfs_%02iz' | |
# Start from the latest, work to the earliest | |
proposed = latest | |
possible_urls = [] | |
while proposed >= earliest: | |
for offset in ( 18, 12, 6, 0 ): | |
possible_urls.append(url_format % \ | |
(proposed.year, proposed.month, proposed.day, offset)) | |
proposed -= datetime.timedelta(days = 1) | |
return possible_urls | |
def dataset_for_time(time, hd): | |
""" | |
Given a datetime object, attempt to find the latest dataset which covers that | |
time and return pydap dataset object for it. | |
""" | |
url_list = possible_urls(time, hd) | |
for url in url_list: | |
try: | |
log.debug('Trying dataset at %s.' % url) | |
dataset = pydap.client.open_url(url) | |
start_time = timestamp_to_datetime(dataset.time[0]) | |
end_time = timestamp_to_datetime(dataset.time[-1]) | |
if start_time <= time and end_time >= time: | |
log.info('Found good dataset at %s.' % url) | |
return dataset | |
except pydap.exceptions.ServerError: | |
# Skip server error. | |
pass | |
raise RuntimeError('Could not find appropriate dataset.') | |
# If this is being run from the interpreter, run the main function. | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment