Skip to content

Instantly share code, notes, and snippets.

@danhammer
Created June 19, 2013 17:38
Show Gist options
  • Save danhammer/5816230 to your computer and use it in GitHub Desktop.
Save danhammer/5816230 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
"""
This script updates our MODIS data archive stored on S3 based on what's
available on NASA's MODIS data FTP server (e4ftl01u.ecs.nasa.gov)
The MODIS FTP server is organized by product then date, in the format
/MODIS_Composites/MOLT/MOD13A3.005/2000-02-01. This corresponds to the Terra
(MOLT) vegetation indices product at 1000m resolution, 32-day composites,
for the month of February 2000.
This script gets a list of all date folders (e.g. 2000-02-01),
then checks whether we have any files in the corresponding date folder on S3
(e.g. s3://modisfiles/MOD13A3/2000-02-01/). If we use the quick and dirty
comparator (see simpleS3FileCheck()), if there is even one file in this
directory the script will not download anything. If there are no files in
that directory, the script will download all files from the FTP server in
that date directory and upload it to the corresponding directory on S3.
A more comprehensive script could one day check whether all files for a given
date range have indeed been downloaded. See notes in exhaustiveS3FileCheck()
"""
import os
from datetime import date, timedelta
import ftplib
from ftp import FTPConn
import common
import hipchat, transfer
#server = d.modis_ftp
#server = "e4ftl01u.ecs.nasa.gov"
#server = "e4ftl01.cr.usgs.gov" # as of September 13, 2011
class Results():
error = dict()
error["dates"] = dict()
error["files"] = dict()
success = dict()
success["dates"] = dict()
success["files"] = dict()
get = dict()
get["dates"] = dict()
def getFtpModisDatesList(server, ftp_base_path):
# get a list of all the date directories on MODIS server
# returns a list of the dates and an active FTPConn instance
ftp = FTPConn(host=server, user="anonymous", passwd="anonymous@")
print "\nGetting date directories on MODIS server"
# first list element is size of directory - not useful
dirlist = ftp.listdir(ftp_base_path, include_path=False)[1:]
# only keep the name of the directory, which comes at the
# end after a bunch of metadata
dates = [dname[-10:] for dname in dirlist]
return ftp, dates
def cleanDates(dates, filter):
a, b, c = filter.split("-")
date_filter = date(int(a), int(b), int(c))
# Make sure dates are valid by casting as date types. This is really just
# insurance against putting weird new files on our S3 account. Anything
# that doesn't match the date pattern will be skipped (and logged as such)
print "\nCleaning dates"
outdates = []
for modisdate in dates:
try:
# if the directories are named as we expect,
# this should work just fine.
yyyy, mm, dd = [int(i) for i in modisdate.split(".")]
dt = date(int(yyyy), int(mm), int(dd))
if dt >= date_filter:
outdates.append(dt.isoformat())
except:
# if the split or date formatting doesn't work, or something else goes
# wrong, we've got an unexpected directory structure on the server
print "Invalid date:", modisdate
return outdates
def genPaths(modisfile, iso_date, product_prefix):
"""
Generate paths for transfers based on date, modis filename, etc.
Having this as a separate function makes testing easier.
"""
product_prefix = product_prefix.split(".")[0] # drop .005 from raw prefix
# e.g. MODIS_Composites/MOLT/MOD13A3.005/2000.01.01/MOD13A3.A2000001.h08v06.005.2007111065956.hdf
ftppath = modisfile
# e.g. /mnt/temp/2000-01-01/MOD13A3.A2000001.h08v06.005.2007111065956.hdf
localpath = os.path.join(common.paths["temp"], iso_date, os.path.split(ftppath)[1])
# e.g. /mnt/temp/2000-01-01/
localbasepath = os.path.split(localpath)[0]
# e.g. s3://modisfiles/MOD13A3/2000-01-01/MOD13A3.A2000001.h08v06.005.2007111065956.hdf
s3path = "%s/%s/%s" % (product_prefix,
iso_date, os.path.split(localpath)[1])
return ftppath, localpath, localbasepath, s3path
def isUpdatedFile(s3path, staging_bucket_conn, bucket_conn):
# have to check whether this new file is an update of an old one.
staging_old_existing = transfer.get_s3_file_list(s3path.split(".005.")[0], staging_bucket_conn)
#old_existing = transfer.get_s3_file_list(s3path.split(".005.")[0], bucket_conn)
#if old_existing:
# delete_old = True
if staging_old_existing:
delete_old = True
else:
return False
if delete_old:
#if old_existing:
# b = bucket_conn
# old_delete = old_existing[0]
if staging_old_existing:
b = staging_bucket_conn
old_delete = staging_old_existing[0]
fname = os.path.split(s3path)[1]
create_date = int(fname.split(".005.")[1].split(".hdf")[0])
old_fname = os.path.split(old_delete)[1]
old_create_date = int(old_fname.split(".005.")[1].split(".hdf")[0])
if create_date > old_create_date:
return [old_delete, b]
return False
def checkModisFileOnS3(staging, bucket, staging_bucket, iso_date, product_prefix, modisfile):
# only get actual hdf files, not jpg previews or xml metadata
if modisfile[-4:] == ".hdf":
ftppath, localpath, localbasepath, s3path = genPaths(modisfile,
iso_date,
product_prefix)
if not transfer.s3_exists(s3path, bucket_conn=bucket) and not transfer.s3_exists(s3path, bucket_conn=staging_bucket):
is_updated = isUpdatedFile(s3path, staging_bucket, bucket)
if is_updated:
old_s3path, bucket_delete_from = is_updated
bucket_delete_from.delete_key(old_s3path)
print "Deleted old version:", old_s3path
return [ftppath, localpath, s3path]
return None
def getFtpFileList(ftp, ftp_base_path, modisdate, server=None):
# have to go back to period seperators ...
path = ftp_base_path + modisdate.replace("-", ".") + "/*.hdf"
print "Getting list of files for", modisdate
try:
# get a list of all files in this date directory
filelist = ftp.list_nometa(path, include_path=False)
except ftplib.error_temp:
# timeout happens despite all the downloads because the
# FileObj class actually stores an internal connection to the
# FTP server. Thus no need to log back in. Because we're
# using another connection to handle the directory crawling,
# there's a timeout when you're downloading lots of stuff
# (like 500m data). This wasn't an issue with 1000m data.
print "FTP timeout - logging back in\n"
ftp = FTPConn(host=server, user="anonymous", passwd="anonymous@")
filelist = ftp.list_nometa(path, include_path=False)
return ftp, filelist
def exhaustiveS3FileCheck(product_prefix, ftp, ftp_base_path, staging, bucket, staging_bucket, dates):
"""
Instead of just checking whether any files at all have been uploaded to
S3, we might want to know if all files of interest have been uploaded.
"""
print "Checking whether we already have data for dates on server ..."
to_get = {}
m = 0
n = 0
for modisdate in dates:
print "Checking", modisdate
to_get[modisdate] = []
ftp, filelist = getFtpFileList(ftp, ftp_base_path, modisdate)
# increment within a filelist for a given date
i = 0
filelist = sorted(filelist)
for modisfile in filelist:
m += 1
i += 1
tile = common.getModisTileFromFname(os.path.split(modisfile)[1])
# handle case where there is more than one file for a given tile in a given date
# this happens occasionally when a tile is updated, as in mid-April 2012
# In this case, the next file will be for the same tile so we ignore the current file.
try:
next_tile = common.getModisTileFromFname(os.path.split(filelist[i])[1])
if next_tile == tile:
continue
except IndexError:
pass
if tile in common.tiles:
paths = checkModisFileOnS3(staging, bucket, staging_bucket, modisdate, product_prefix, modisfile)
if paths:
to_get[modisdate].append(paths)
n += 1
print "\nChecked %i files for %i dates\nAcquiring %i new file(s)" % (m, len(dates), n)
return to_get, m
def getModisFiles(to_get, ftp, bucket_conn):
# paths is defined in checkModisFileOnS3()
# s3path will be in the staging bucket or regular bucket,
# depending on the results of checkModisFileOnS3()
acquired = list()
for date, paths in to_get.items():
if paths:
for ftp_local_s3 in paths:
ftppath, localpath, s3path = ftp_local_s3
local_base_path = os.path.split(localpath)[0]
if not os.path.exists(local_base_path):
os.makedirs(local_base_path)
if not os.path.exists(localpath):
ftp.get(ftppath, localpath)
transfer.put(localpath, s3path, bucket_conn=bucket_conn, rrs=True)
os.remove(localpath)
acquired.append(s3path)
return acquired
def sendUpdateStatusEmail(email, product_prefix, to_get, dates, checked, acquired):
body = "%s data update status\n\n" % product_prefix
body += "%i files checked\n" % checked
body += "%i files acquired\n" % len(acquired)
body += "\n%i date(s) checked:\n" % len(to_get.keys())
body += "\n".join(dates)
if acquired:
body += "\nAcquired:\n"
body += "\n".join(acquired)
for address in email.split(" "):
common.sendStatusEmail(to_email=address, subject="[forma-data-update] %s: %s new files acquired" % (product_prefix, len(acquired)), body=body)
return body
def parseCL():
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-r", "--resolution", default=None, help="Nominal resolution", dest="resolution")
parser.add_option("-i", "--interval", default=None, help="Interval, in days, between datasets", dest="interval")
parser.add_option("-f", "--filter", default=(date.today() - timedelta(90)).isoformat(), help="Filter out dates before filter string", dest="filter")
parser.add_option("-b", "--bucket", default="modisfiles", help="S3 bucket to use for file checking", dest="bucket")
parser.add_option("-a", "--staging-bucket", default="formastaging", help="S3 bucket to use for staging new files", dest="staging_bucket")
parser.add_option("-g", "--staging", default=True, help="Upload missing files to staging bucket?", dest="staging")
parser.add_option("-s", "--server", default="e4ftl01.cr.usgs.gov", help="MODIS FTP server", dest="server")
parser.add_option("-k", "--kiss", default=True, help="Checking type - keep it simple, stupid or exhaustive", dest="kiss")
parser.add_option("-p", "--product-prefix", default=None, help="MODIS product prefix", dest="product_prefix")
parser.add_option("-e", "--email", default="[email protected]", help="Address(es) for status emails - space-separted string", dest="email")
options, args = parser.parse_args()
resolution = int(options.resolution)
interval = int(options.interval)
filter = options.filter
bucket = options.bucket
staging_bucket = options.staging_bucket
staging = options.staging
server = options.server
kiss = options.kiss
product_prefix = options.product_prefix
email = options.email
return bucket, staging_bucket, server, filter, product_prefix, staging, kiss, resolution, interval, email
def main(bucket="modisfiles", staging_bucket="formastaging", server=common.modis_ftp, filter="2000-01-01",
product_prefix=None, staging=True, kiss=False, resolution=None, interval=None, email="[email protected]"):
# Settings
if not product_prefix and resolution and interval:
product_prefix = common.modis_products[resolution][interval]
else:
raise Exception("Must provide MODIS product prefix or resolution and interval")
bucket = transfer.get_bucket_conn(bucket)
staging_bucket = transfer.get_bucket_conn(staging_bucket)
ftp_base_path = "/MODIS_Composites/MOLT/%s/" % product_prefix
# do actual work
ftp, dates = getFtpModisDatesList(server, ftp_base_path)
dates = cleanDates(dates, filter)
to_get, checked = exhaustiveS3FileCheck(product_prefix, ftp, ftp_base_path, staging, bucket, staging_bucket, dates)
if staging:
bucket_conn = staging_bucket
else:
bucket_conn = bucket
acquired = getModisFiles(to_get, ftp, bucket_conn)
# cleanup
status = sendUpdateStatusEmail(email, product_prefix, to_get, dates, checked, acquired)
if len(acquired) > 0:
hipchat.send_message("%s: %i files acquired" % (product_prefix, len(acquired)))
return
if __name__ == "__main__":
bucket, staging_bucket, server, filter, product_prefix, staging, kiss, resolution, interval, email = parseCL()
main(bucket, staging_bucket, server, filter, product_prefix, staging, kiss, resolution, interval, email)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment