Last active
April 12, 2018 05:47
-
-
Save cynici/3126874 to your computer and use it in GitHub Desktop.
De-duplicate MODIS fire pixels in CSV format downloaded from FIRMS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys, os, re | |
import datetime, time | |
import csv | |
import psycopg2, psycopg2.extras | |
import logging | |
logger = logging.getLogger(__name__) | |
help_text = """De-duplicate MODIS fire downloaded from FIRMS in CSV format. | |
Output unseen fire pixels CSV only unless 'do-insert' is set. | |
%prog {firms_csv} [options, -h for details]""" | |
# http://www.dabeaz.com/coroutines/coroutine.py | |
# http://www.dabeaz.com/coroutines/grepclose.py | |
def coroutine(func): | |
def start(*args,**kwargs): | |
cr = func(*args,**kwargs) | |
cr.next() | |
return cr | |
return start | |
class ModisFireDb: | |
def __init__(self, **kwargs): | |
""" kwargs must provide these parameters: | |
'host=localhost port=5432 dbname=openafis user=afisuser password=afisdb#' | |
""" | |
self.dsn = "host=%(host)s port=%(port)s dbname=%(dbname)s user=%(user)s password=%(pwd)s" % kwargs | |
logger.debug("dsn=%s" % self.dsn) | |
try: | |
self.conn = psycopg2.connect(self.dsn) | |
except Exception, err: | |
raise ValueError("Invalid database connection string '%s': %s"%(self.dsn, err)) | |
def __del__(self): | |
if hasattr(self, 'conn') and self.conn: | |
self.conn.close() | |
def fetch_daterange(self, startdt, enddt, satellite=None): | |
'''Query database for MODIS fire in given date range. | |
Return row count and a dictionary keyed by point geometry (WKT)''' | |
recdic = {} | |
sql = """\ | |
SELECT obs_time_stamp dt, ST_AsText(the_geom) geom \ | |
FROM af_modis \ | |
WHERE obs_time_stamp >= %(startdt)s AND obs_time_stamp <= %(enddt)s""" | |
if satellite: | |
sql += " AND satellite = %(satellite)s" | |
cur = self.conn.cursor('my_cursor', cursor_factory=psycopg2.extras.DictCursor) | |
logger.debug("sql=%s" % cur.mogrify(sql, dict(startdt=startdt, enddt=enddt, satellite=satellite))) | |
cur.execute(sql, dict(startdt=startdt, enddt=enddt, satellite=satellite)) | |
count = 0 | |
for rec in cur: | |
count += 1 | |
if rec['geom'] not in recdic: | |
recdic[rec['geom']] = [] | |
recdic[rec['geom']].append(rec['dt']) | |
logger.debug("sql returned %d" % count) | |
cur.close() | |
cur = None | |
return count, recdic | |
def executemany(self, tablename, paramlist): | |
#logger.debug("%s" % paramlist) | |
add_sql = "INSERT INTO %s "%tablename + """\ | |
(brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp) \ | |
VALUES (%s, %s, %s, %s, %s, %s, %s, ST_GeometryFromText(%s), %s);""" | |
cur = self.conn.cursor() | |
#Gives incorrect TypeError | |
#logger.debug("%s" % cur.mogrify(add_sql, paramlist)) | |
cur.executemany(add_sql, paramlist) | |
self.conn.commit() | |
cur.close() | |
@staticmethod | |
def convert_to_params(paramlist, csvarray): | |
'''Adapt this: | |
csvarray = (latitude,longitude,brightness,scan,track,acq_date,acq_time,satellite,confidence,version,bright_t31,frp) | |
To this: | |
brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp''' | |
# All column values from CSV are given as strings | |
lat, lon, brightness, scan, track, acqdate, acqtime, satellite, confidence, version, bright_t31, frp = csvarray | |
# | |
# Set second to '01' to differentiate from pre-existing records in database | |
# | |
obs_time_stamp = datetime.datetime.strptime('%s %04d01'%(acqdate, int(acqtime)), "%Y-%m-%d %H%M%S") | |
# (brightness, frp, track, acqdate, acqtime, satellite, confidence, the_geom, obs_time_stamp) | |
wkt = 'SRID=4326;POINT(%s %s)' % (lon, lat) | |
paramlist.append(( | |
brightness, | |
frp, | |
track, | |
obs_time_stamp, | |
obs_time_stamp.strftime('%H:%M'), | |
satellite, | |
confidence, | |
wkt, | |
obs_time_stamp, | |
)) | |
def daterange(start_date, end_date): | |
''' A generator to return dates between given date range, both inclusive.''' | |
for n in range((end_date - start_date).days + 1): | |
yield start_date + datetime.timedelta(days=n) | |
class DatetimeDict: | |
def __init__(self, tolerance=datetime.timedelta(hours=1), *args, **kwargs): | |
if type(tolerance) is not datetime.timedelta: | |
raise ValueError("%s: tolerance must be a datetime.timedelta value" % type(self).__name__) | |
self.doydict = {} # keyed by ordinal date, each value is list of datetimes | |
self.datetimedict = {} # keyed by datetime, each value is list of data | |
self.tolerance = tolerance | |
def getitem(self, key, subkey=None): | |
'''Returns a list of data of the same overpass''' | |
sk = self._get_similar_key(key, subkey) | |
if sk is None: | |
raise KeyError("%s: No entry for %s" % (type(self).__name__, key)) | |
return self.datetimedict[ sk ] | |
def getitem_by_exact_key(self, key): | |
return self.datetimedict[ key ] | |
def setitem(self, key, val, subkey=None): | |
sk = self._get_similar_key(key, subkey) | |
if sk is None: | |
sk = key | |
for single_date in daterange(key-self.tolerance, key+self.tolerance): | |
doy = single_date.strftime('%Y%j') | |
if subkey not in self.doydict: | |
self.doydict[ subkey ] = {} | |
if doy not in self.doydict[ subkey ]: | |
self.doydict[ subkey ][ doy ] = {} | |
if sk not in self.doydict[ subkey ][ doy ]: | |
self.doydict[ subkey ][ doy ][ sk ] = 1 | |
else: | |
self.doydict[ subkey ][ doy ][ sk ] += 1 | |
if sk not in self.datetimedict: | |
self.datetimedict[ sk ] = [] | |
self.datetimedict[ sk ].append(val) | |
def __repr__(self): | |
return '%s(doydict=%s dtdict=%s)' % (type(self).__name__, self.doydict, self.datetimedict.keys()) | |
def __len__(self): | |
return len(self.datetimedict) | |
def __iter__(self): | |
for k in self.datetimedict: | |
yield k | |
def iteritems(self): | |
#return self.datetimedict.keys() | |
for subkey in self.doydict: | |
for doy in self.doydict[ subkey ]: | |
for k in self.doydict[ subkey ][ doy ]: | |
yield k, subkey | |
def _get_similar_key(self, dtval, subkey): | |
if type(dtval) is not datetime.datetime: | |
raise ValueError("%s: Dictionary key must be a datetime.datetime value." % type(self).__name__) | |
if subkey not in self.doydict: | |
return None | |
similar_key = None | |
for single_date in daterange(dtval-self.tolerance, dtval+self.tolerance): | |
doy = single_date.strftime('%Y%j') | |
if doy not in self.doydict[ subkey ]: | |
continue | |
for k in self.doydict[ subkey ][ doy ]: | |
if k >= dtval: | |
delta = k - dtval | |
else: | |
delta = dtval - k | |
if delta <= self.tolerance: | |
similar_key = k | |
break | |
#logger.debug("input=%s output=%s" % (dtval, similar_key)) | |
return similar_key | |
#! Main | |
#! ---- | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
debuglevelD = { | |
'debug': logging.DEBUG, | |
'info': logging.INFO, | |
'warning': logging.WARNING, | |
'error': logging.ERROR, | |
'critical': logging.CRITICAL | |
} | |
defvals = { | |
'port': 5432, | |
} | |
from optparse import OptionParser | |
parser = OptionParser(usage=help_text) | |
parser.add_option("--host", dest="host", type="string", help="Database hostname") | |
parser.add_option("--port", dest="port", type="int", help="Database port (%d)" % defvals['port']) | |
parser.add_option("-u", "--user", dest="user", type="string", help="Database username") | |
parser.add_option("-d", "--dbname", dest="dbname", type="string", help="Database name") | |
parser.add_option("-p", "--pwd", dest="pwd", type="string", help="Database password", metavar='PASSWORD') | |
parser.add_option("--analyze-only", dest="analyze_only", action="store_true", help="Count overpasses") | |
parser.add_option("--tablename", dest="tablename", type="string", help="Insert fire pixels to database table") | |
parser.add_option("--loglevel", dest="debug", type="string", help="Verbosity %s"%debuglevelD.keys(), metavar='LOGLEVEL') | |
parser.set_defaults(**defvals) | |
(options, args) = parser.parse_args() | |
if len(args) != 1: | |
parser.error("Requires exactly one MODIS fire downloaded from FIRMS in CSV format.") | |
csvfile = args[0] | |
if options.debug: | |
if options.debug not in debuglevelD: raise AssertionError("Verbosity level must be one of: %s"%debuglevelD.keys()) | |
dbglvl = debuglevelD[options.debug] | |
else: | |
dbglvl = logging.WARNING | |
global logger | |
logger.setLevel(dbglvl) | |
ch = logging.StreamHandler() | |
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s %(message)s') ) | |
ch.setLevel(dbglvl) | |
logger.addHandler(ch) | |
if not options.analyze_only: | |
modis = ModisFireDb(host=options.host, port=options.port, dbname=options.dbname, user=options.user, pwd=options.pwd) | |
logger.info("Connected to %s@%s" % (options.dbname, options.host)) | |
# | |
# First, aggregate the fire pixels by ordinal dates | |
# Fire pixels of the same julian date are saved in the same list. | |
# Next, fetch existing fire pixels for each day from database. | |
# If a CSV fire is within tolerance time interval | |
# special hash table of overpasses | |
passdic = DatetimeDict() | |
csvreader = csv.reader(open(csvfile), delimiter=',') | |
header = None | |
logger.debug("Loading input from %s" % csvfile) | |
for row in csvreader: | |
#print csvreader.line_num | |
if csvreader.line_num == 1: # skip header | |
header = row | |
continue | |
# row[5] = yyyy-mm-dd, row[6] = hhmm, row[7] = satellite | |
passdt = datetime.datetime.strptime('%s %04d'%(row[5], int(row[6])), "%Y-%m-%d %H%M") | |
passdic.setitem(passdt, row, subkey=row[7]) | |
logger.debug("Loaded %d overpasses from input" % (len(passdic))) | |
if options.analyze_only: | |
print "Found %d overpasses in %s" % (len(passdic), csvfile) | |
return 0 | |
dedup_count = 0 | |
for passdt, satellite in passdic.iteritems(): | |
startdt = passdt - datetime.timedelta(minutes=30) | |
enddt = passdt + datetime.timedelta(minutes=30) | |
logger.info("Query for existing MODIS %s fire from %s till %s" % (satellite, startdt, enddt)) | |
count, existing_pixels = modis.fetch_daterange(startdt, enddt, satellite=satellite) | |
logger.info("Found %d existing records" % (count)) | |
paramlist = [] | |
for row in passdic.getitem_by_exact_key(passdt): | |
# col 1 = latitude, col 2 = longitude | |
# WKT(x, y) | |
geomwkt = 'POINT(%s %s)' % (str(float(row[1])), str(float(row[0]))) | |
if geomwkt in existing_pixels: | |
#logger.info("Skip existing pixel: %s %s" % (geomwkt, passdt)) | |
dedup_count += 1 | |
else: | |
if not options.tablename: | |
if header: | |
print ",".join(header) | |
header = None | |
print ",".join(row) | |
ModisFireDb.convert_to_params(paramlist, row) | |
existing_pixels = None | |
if options.tablename and paramlist: | |
modis.executemany(options.tablename, paramlist) | |
logger.info("Removed %d duplicates" % dedup_count) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment