Created
September 1, 2016 23:44
-
-
Save evertrol/d13aef538eec0c633ce5b7a2acd6811d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import numpy as np | |
import healpy as hp | |
from astropy.io import fits | |
from astropy.wcs import WCS | |
from astropy.coordinates import SkyCoord | |
from astropy import units | |
from spherical_geometry.polygon import SphericalPolygon | |
LIMIT = 1 | |
def main(expmap, skymap=None): | |
with fits.open(expmap) as hdulist: | |
data = hdulist[0].data | |
header = hdulist[0].header.copy() | |
wcs = WCS(header) | |
# clean up | |
data[data < LIMIT] = 0 | |
ny, nx = data.shape | |
xp, yp = np.meshgrid(np.arange(nx), np.arange(ny)) | |
xc, yc = wcs.wcs_pix2world(xp, yp, 0) | |
step = 1 | |
x = np.arange(nx + 1.) - 0.5 | |
y = np.arange(ny + 1.) - 0.5 | |
# pixel coordinates of corners | |
xp, yp = np.meshgrid(x, y) | |
# wcs coordinates of corners | |
xw, yw = wcs.wcs_pix2world(xp[::step,::step], yp[::step,::step], 0) | |
polygons = [] | |
for ix in range(ny//step): | |
for iy in range(nx//step): | |
if data[ix*step, iy*step] < LIMIT: | |
continue | |
if data[ix*step+1, iy*step] < LIMIT: | |
continue | |
if data[ix*step, iy*step+1] < LIMIT: | |
continue | |
if data[ix*step+1, iy*step+1] < LIMIT: | |
continue | |
ra = [xw[ix, iy], xw[ix, iy+1], xw[ix, iy+1], xw[ix+1, iy]] | |
decl = [yw[ix, iy], yw[ix, iy+1], yw[ix+1, iy+1], yw[ix+1, iy]] | |
polygon = SphericalPolygon.from_lonlat(lon=ra[::-1], | |
lat=decl[::-1], | |
degrees=True, | |
auto_close=True) | |
polygons.append(polygon) | |
area = 0 | |
for polygon in polygons: | |
area += polygon.area() | |
print('Total area covered:', area, np.rad2deg(np.rad2deg(area))) | |
if skymap: | |
centers = SkyCoord(ra=xc, dec=yc, unit=units.degree) | |
info = hp.read_map(skymap, h=True, field=None, | |
verbose=False, nest=None) | |
try: | |
# new-style skymaps | |
skymap, distmu, distsigma, distnorm, header = info | |
except ValueError as exc: | |
if "not enough values to unpack" in str(exc): | |
# assume an older map without distance information | |
skymap, header = info | |
else: | |
raise | |
header = dict([(key.lower(), value) for key, value in header]) | |
nested = header['ordering'] == 'NESTED' | |
nside = 2048 # 2048: ~8 GB needed | |
skymap = hp.pixelfunc.ud_grade(skymap, nside_out=nside, power=-2) | |
ipix = np.arange(len(skymap)) | |
theta, phi = hp.pix2ang(nside, ipix, nest=nested) | |
skycoords = SkyCoord(ra=phi*units.rad, | |
dec=(0.5*np.pi - theta)*units.rad) | |
xp, yp = wcs.wcs_world2pix(skycoords.ra.value, skycoords.dec.value, 0) | |
mask = (xp >= 0) & (xp <= nx) & (yp >= 0) & (yp <= ny) | |
xp = xp[mask] | |
yp = yp[mask] | |
xp = xp.astype(np.int) | |
yp = yp.astype(np.int) | |
img = np.zeros((ny, nx), dtype=np.float) | |
skymap = skymap[mask] | |
skycoords = skycoords[mask] | |
for i in range(len(skymap)): | |
img[yp[i], xp[i]] += skymap[i] | |
fits.PrimaryHDU(img, header=wcs.to_header()).writeto( | |
'tmp.fits', clobber=True) | |
weights = np.ones(data.shape) | |
weights[data < LIMIT] = 0 | |
coverage = img * weights | |
print('Total probability covered:', np.sum(coverage)) | |
if __name__ == '__main__': | |
try: | |
skymap = sys.argv[2] | |
except: | |
skymap = None | |
main(sys.argv[1], skymap=skymap) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment