Skip to content

Instantly share code, notes, and snippets.

@cynici
Last active April 12, 2018 05:47
Show Gist options
  • Save cynici/3126874 to your computer and use it in GitHub Desktop.
Save cynici/3126874 to your computer and use it in GitHub Desktop.
De-duplicate MODIS fire pixels in CSV format downloaded from FIRMS
#!/usr/bin/env python
import sys, os, re
import datetime, time
import csv
import psycopg2, psycopg2.extras
import logging
logger = logging.getLogger(__name__)
help_text = """De-duplicate MODIS fire downloaded from FIRMS in CSV format.
Output unseen fire pixels CSV only unless 'do-insert' is set.
%prog {firms_csv} [options, -h for details]"""
# http://www.dabeaz.com/coroutines/coroutine.py
# http://www.dabeaz.com/coroutines/grepclose.py
def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
class ModisFireDb:
def __init__(self, **kwargs):
""" kwargs must provide these parameters:
'host=localhost port=5432 dbname=openafis user=afisuser password=afisdb#'
"""
self.dsn = "host=%(host)s port=%(port)s dbname=%(dbname)s user=%(user)s password=%(pwd)s" % kwargs
logger.debug("dsn=%s" % self.dsn)
try:
self.conn = psycopg2.connect(self.dsn)
except Exception, err:
raise ValueError("Invalid database connection string '%s': %s"%(self.dsn, err))
def __del__(self):
if hasattr(self, 'conn') and self.conn:
self.conn.close()
def fetch_daterange(self, startdt, enddt, satellite=None):
'''Query database for MODIS fire in given date range.
Return row count and a dictionary keyed by point geometry (WKT)'''
recdic = {}
sql = """\
SELECT obs_time_stamp dt, ST_AsText(the_geom) geom \
FROM af_modis \
WHERE obs_time_stamp >= %(startdt)s AND obs_time_stamp <= %(enddt)s"""
if satellite:
sql += " AND satellite = %(satellite)s"
cur = self.conn.cursor('my_cursor', cursor_factory=psycopg2.extras.DictCursor)
logger.debug("sql=%s" % cur.mogrify(sql, dict(startdt=startdt, enddt=enddt, satellite=satellite)))
cur.execute(sql, dict(startdt=startdt, enddt=enddt, satellite=satellite))
count = 0
for rec in cur:
count += 1
if rec['geom'] not in recdic:
recdic[rec['geom']] = []
recdic[rec['geom']].append(rec['dt'])
logger.debug("sql returned %d" % count)
cur.close()
cur = None
return count, recdic
def executemany(self, tablename, paramlist):
#logger.debug("%s" % paramlist)
add_sql = "INSERT INTO %s "%tablename + """\
(brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp) \
VALUES (%s, %s, %s, %s, %s, %s, %s, ST_GeometryFromText(%s), %s);"""
cur = self.conn.cursor()
#Gives incorrect TypeError
#logger.debug("%s" % cur.mogrify(add_sql, paramlist))
cur.executemany(add_sql, paramlist)
self.conn.commit()
cur.close()
@staticmethod
def convert_to_params(paramlist, csvarray):
'''Adapt this:
csvarray = (latitude,longitude,brightness,scan,track,acq_date,acq_time,satellite,confidence,version,bright_t31,frp)
To this:
brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp'''
# All column values from CSV are given as strings
lat, lon, brightness, scan, track, acqdate, acqtime, satellite, confidence, version, bright_t31, frp = csvarray
#
# Set second to '01' to differentiate from pre-existing records in database
#
obs_time_stamp = datetime.datetime.strptime('%s %04d01'%(acqdate, int(acqtime)), "%Y-%m-%d %H%M%S")
# (brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp)
wkt = 'SRID=4326;POINT(%s %s)' % (lon, lat)
paramlist.append((
brightness,
frp,
track,
obs_time_stamp,
obs_time_stamp.strftime('%H:%M'),
satellite,
confidence,
wkt,
obs_time_stamp,
))
def daterange(start_date, end_date):
''' A generator to return dates between given date range, both inclusive.'''
for n in range((end_date - start_date).days + 1):
yield start_date + datetime.timedelta(days=n)
class DatetimeDict:
def __init__(self, tolerance=datetime.timedelta(hours=1), *args, **kwargs):
if type(tolerance) is not datetime.timedelta:
raise ValueError("%s: tolerance must be a datetime.timedelta value" % type(self).__name__)
self.doydict = {} # keyed by ordinal date, each value is list of datetimes
self.datetimedict = {} # keyed by datetime, each value is list of data
self.tolerance = tolerance
def getitem(self, key, subkey=None):
'''Returns a list of data of the same overpass'''
sk = self._get_similar_key(key, subkey)
if sk is None:
raise KeyError("%s: No entry for %s" % (type(self).__name__, key))
return self.datetimedict[ sk ]
def getitem_by_exact_key(self, key):
return self.datetimedict[ key ]
def setitem(self, key, val, subkey=None):
sk = self._get_similar_key(key, subkey)
if sk is None:
sk = key
for single_date in daterange(key-self.tolerance, key+self.tolerance):
doy = single_date.strftime('%Y%j')
if subkey not in self.doydict:
self.doydict[ subkey ] = {}
if doy not in self.doydict[ subkey ]:
self.doydict[ subkey ][ doy ] = {}
if sk not in self.doydict[ subkey ][ doy ]:
self.doydict[ subkey ][ doy ][ sk ] = 1
else:
self.doydict[ subkey ][ doy ][ sk ] += 1
if sk not in self.datetimedict:
self.datetimedict[ sk ] = []
self.datetimedict[ sk ].append(val)
def __repr__(self):
return '%s(doydict=%s dtdict=%s)' % (type(self).__name__, self.doydict, self.datetimedict.keys())
def __len__(self):
return len(self.datetimedict)
def __iter__(self):
for k in self.datetimedict:
yield k
def iteritems(self):
#return self.datetimedict.keys()
for subkey in self.doydict:
for doy in self.doydict[ subkey ]:
for k in self.doydict[ subkey ][ doy ]:
yield k, subkey
def _get_similar_key(self, dtval, subkey):
if type(dtval) is not datetime.datetime:
raise ValueError("%s: Dictionary key must be a datetime.datetime value." % type(self).__name__)
if subkey not in self.doydict:
return None
similar_key = None
for single_date in daterange(dtval-self.tolerance, dtval+self.tolerance):
doy = single_date.strftime('%Y%j')
if doy not in self.doydict[ subkey ]:
continue
for k in self.doydict[ subkey ][ doy ]:
if k >= dtval:
delta = k - dtval
else:
delta = dtval - k
if delta <= self.tolerance:
similar_key = k
break
#logger.debug("input=%s output=%s" % (dtval, similar_key))
return similar_key
#! Main
#! ----
def main(argv=None):
if argv is None:
argv = sys.argv
debuglevelD = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
defvals = {
'port': 5432,
}
from optparse import OptionParser
parser = OptionParser(usage=help_text)
parser.add_option("--host", dest="host", type="string", help="Database hostname")
parser.add_option("--port", dest="port", type="int", help="Database port (%d)" % defvals['port'])
parser.add_option("-u", "--user", dest="user", type="string", help="Database username")
parser.add_option("-d", "--dbname", dest="dbname", type="string", help="Database name")
parser.add_option("-p", "--pwd", dest="pwd", type="string", help="Database password", metavar='PASSWORD')
parser.add_option("--analyze-only", dest="analyze_only", action="store_true", help="Count overpasses")
parser.add_option("--tablename", dest="tablename", type="string", help="Insert fire pixels to database table")
parser.add_option("--loglevel", dest="debug", type="string", help="Verbosity %s"%debuglevelD.keys(), metavar='LOGLEVEL')
parser.set_defaults(**defvals)
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("Requires exactly one MODIS fire downloaded from FIRMS in CSV format.")
csvfile = args[0]
if options.debug:
if options.debug not in debuglevelD: raise AssertionError("Verbosity level must be one of: %s"%debuglevelD.keys())
dbglvl = debuglevelD[options.debug]
else:
dbglvl = logging.WARNING
global logger
logger.setLevel(dbglvl)
ch = logging.StreamHandler()
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s %(message)s') )
ch.setLevel(dbglvl)
logger.addHandler(ch)
if not options.analyze_only:
modis = ModisFireDb(host=options.host, port=options.port, dbname=options.dbname, user=options.user, pwd=options.pwd)
logger.info("Connected to %s@%s" % (options.dbname, options.host))
#
# First, aggregate the fire pixels by ordinal dates
# Fire pixels of the same julian date are saved in the same list.
# Next, fetch existing fire pixels for each day from database.
# If a CSV fire is within tolerance time interval
# special hash table of overpasses
passdic = DatetimeDict()
csvreader = csv.reader(open(csvfile), delimiter=',')
header = None
logger.debug("Loading input from %s" % csvfile)
for row in csvreader:
#print csvreader.line_num
if csvreader.line_num == 1: # skip header
header = row
continue
# row[5] = yyyy-mm-dd, row[6] = hhmm, row[7] = satellite
passdt = datetime.datetime.strptime('%s %04d'%(row[5], int(row[6])), "%Y-%m-%d %H%M")
passdic.setitem(passdt, row, subkey=row[7])
logger.debug("Loaded %d overpasses from input" % (len(passdic)))
if options.analyze_only:
print "Found %d overpasses in %s" % (len(passdic), csvfile)
return 0
dedup_count = 0
for passdt, satellite in passdic.iteritems():
startdt = passdt - datetime.timedelta(minutes=30)
enddt = passdt + datetime.timedelta(minutes=30)
logger.info("Query for existing MODIS %s fire from %s till %s" % (satellite, startdt, enddt))
count, existing_pixels = modis.fetch_daterange(startdt, enddt, satellite=satellite)
logger.info("Found %d existing records" % (count))
paramlist = []
for row in passdic.getitem_by_exact_key(passdt):
# col 1 = latitude, col 2 = longitude
# WKT(x, y)
geomwkt = 'POINT(%s %s)' % (str(float(row[1])), str(float(row[0])))
if geomwkt in existing_pixels:
#logger.info("Skip existing pixel: %s %s" % (geomwkt, passdt))
dedup_count += 1
else:
if not options.tablename:
if header:
print ",".join(header)
header = None
print ",".join(row)
ModisFireDb.convert_to_params(paramlist, row)
existing_pixels = None
if options.tablename and paramlist:
modis.executemany(options.tablename, paramlist)
logger.info("Removed %d duplicates" % dedup_count)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment