|
#!/usr/bin/env python |
|
from __future__ import print_function |
|
import struct |
|
import sys |
|
import json |
|
|
|
import s3reader |
|
|
|
def tags_to_bbox(header_vals, to_wgs84=True): |
|
tiepoint = header_vals["ModelTiePointTag"] |
|
scalex, scaley, _ = header_vals["ModelPixelScaleTag"] |
|
width = header_vals["ImageWidth"] |
|
height = header_vals["ImageLength"] |
|
try: |
|
epsg = header_vals["epsg"] |
|
except KeyError: |
|
epsg = None |
|
|
|
if tiepoint[:3] == (0, 0, 0): # corner of UL |
|
ulx, uly = tiepoint[3:5] |
|
elif tiepoint[:3] == (0.5, 0.5, 0): # center of UL |
|
culx, culy = tiepoint[3:5] |
|
ulx = culx - (0.5 * scalex) |
|
uly = culy + (0.5 * scaley) |
|
else: |
|
raise Exception("not tied to UL") |
|
|
|
lrx = ulx + (scalex * width) |
|
lry = uly + (-1 * scaley * height) |
|
|
|
bounds = [ulx, lry, lrx, uly] |
|
|
|
if to_wgs84 and epsg and epsg != 4326: |
|
import rasterio.warp |
|
xs = bounds[::2] |
|
ys = bounds[1::2] |
|
xs, ys = rasterio.warp.transform({'init': 'EPSG:{}'.format(epsg)}, |
|
{'init': 'EPSG:4326'}, |
|
xs, ys) |
|
result = [0]*len(bounds) |
|
result[::2] = xs |
|
result[1::2] = ys |
|
return result |
|
else: |
|
return bounds |
|
|
|
|
|
|
|
def read_tags(path, tags): |
|
struct_fmt = { |
|
2: ('c', 1), # char |
|
3: ('H', 2), # unsigned short int |
|
4: ('I', 4), # unsigned long |
|
12: ('d', 8), # 8 byte double |
|
} |
|
|
|
try: |
|
s3reader.parse_path(path) |
|
# s3 URI, use s3reader |
|
_open = s3reader.open |
|
except ValueError: |
|
# normal file |
|
_open = open |
|
|
|
with _open(path, 'rb') as src: |
|
header = src.read(8) |
|
|
|
endian_tuple = struct.unpack('2c', header[0:2]) |
|
|
|
if endian_tuple == ('I', 'I'): |
|
endian = '<' # little |
|
elif endian_tuple == ('M', 'M'): |
|
endian = '>' |
|
else: |
|
raise Exception |
|
|
|
meaning_of_life = struct.unpack(endian + 'h', header[2:4])[0] |
|
if meaning_of_life != 42: |
|
raise Exception("Not a GeoTiff") |
|
|
|
ifd_offset = struct.unpack(endian + "i", header[4:8])[0] |
|
|
|
header_vals = {} |
|
|
|
src.seek(ifd_offset) |
|
nde = src.read(2) |
|
n_dir_entries = struct.unpack(endian + 'h', nde)[0] |
|
|
|
tags_bytes = src.read(12 * n_dir_entries) |
|
|
|
for d in range(n_dir_entries): |
|
start = d * 12 |
|
end = (d + 1) * 12 |
|
data = tags_bytes[start:end] |
|
tag = struct.unpack(endian + 'H', data[0:2])[0] |
|
|
|
tag, typ, count, val_offset = struct.unpack(endian + 'HHII', data) |
|
|
|
try: |
|
tagname = tags[tag] |
|
except KeyError: |
|
# special case |
|
tagname = 'GeoTIFF-34735' |
|
if tag != 34735: |
|
continue |
|
|
|
fmtchar, bytesper = struct_fmt[typ] |
|
total_bytes = bytesper * count |
|
if total_bytes <= 4: |
|
values = val_offset # not an offset but an actual value |
|
else: |
|
src.seek(val_offset) |
|
values_packed = src.read(total_bytes) |
|
fmt = "{}{}".format(count, fmtchar) |
|
values = struct.unpack(endian + fmt, values_packed) |
|
|
|
if fmtchar == 'c': |
|
values = ''.join(values) |
|
|
|
if tag == 34735: |
|
# geotiff, lets get the epsg code |
|
keyids = values[0::4] |
|
ttagloc = values[1::4] |
|
keyvals = values[3::4] |
|
try: |
|
if dict(zip(keyids, ttagloc))[3072] == 0: |
|
# it's a value not and offset |
|
epsg = dict(zip(keyids, keyvals))[3072] |
|
header_vals['epsg'] = epsg |
|
except KeyError: |
|
continue |
|
|
|
header_vals[tagname] = values |
|
|
|
return header_vals |
|
|
|
|
|
def main(path): |
|
|
|
tags = { |
|
256: 'ImageWidth', |
|
257: 'ImageLength', |
|
33922: "ModelTiePointTag", |
|
33550: "ModelPixelScaleTag"} |
|
|
|
print(json.dumps(tags_to_bbox(read_tags(path, tags)))) |
|
|
|
|
|
if __name__ == "__main__": |
|
path = sys.argv[1] |
|
main(path) |