Skip to content

Instantly share code, notes, and snippets.

@vhaasteren
Last active April 27, 2023 07:48
Show Gist options
  • Save vhaasteren/c7d02ba1dd7d48c89e12358243d6fdec to your computer and use it in GitHub Desktop.
Save vhaasteren/c7d02ba1dd7d48c89e12358243d6fdec to your computer and use it in GitHub Desktop.
Create Enterprise mock pulsars
"""
For simulations one often needs to read in par/tim files. But on some machines
it is hard to install PINT or tempo2/libstempo. And reading in par/tim files is
a bit slow, especially when reading in lots of them. Here we allow use of
Enterprise with mock pulsar.
Generate some mock par files like so:
def example_usage():
import mockpulsar, os
npsrs = 100
output_dir = './fakepsrs/'
example_tim = 'data/tim/timfile.tim'
psrs = []
for pp in range(npsrs):
parfile, filename = mockpulsar.generate_parfile(mockpulsar.parameters_outline)
with open(os.path.join(output_dir, filename), 'w+') as fp:
fp.write(parfile)
# Read some example timfile (actual TOAs don't matter)
ssbfreqs, toas_mjds = mockpulsar.read_timfile_toas(example_tim)
psrs.append(mockpulsar.MockPulsar(os.path.join(output_dir, filename), toas_mjds, ssbfreqs, toaerrs=1e-7, telescope='GBT'))
"""
import copy
import numpy as np
from enterprise.pulsar import BasePulsar
import ephem
def gen_parameter_func(forward=lambda x: x, backward=lambda x: x):
def gen_parameter(pd):
trans_min = forward(pd['min'])
trans_max = forward(pd['max'])
trans_par = trans_min + np.random.rand() * (trans_max - trans_min)
return backward(trans_par)
return gen_parameter
parameters_outline = {
"ELONG": {'min': 0.0, 'max': 360.0, 'fit': 1, 'unc': 0.1},
"ELAT": {'min': -90.0, 'max': 90.0, 'fit': 1, 'unc': 0.1,
'dist': gen_parameter_func(
forward=lambda x: np.sin(np.radians(x)),
backward=lambda x: np.degrees(np.arcsin(x)))
},
"PMELONG": {'min': -20.0, 'max': 20.0, 'fit': 1, 'unc': 0.1},
'PMELAT': {'min': -20.0, 'max': 20.0, 'fit': 1, 'unc': 0.1},
'F0': {'min': 100.0, 'max': 900.0, 'fit': 1, 'unc': 1e-5},
'F1': {'min': 1.0e-16, 'max': 1.0e-15, 'fit': 1, 'unc': 1e-18},
'PX': {'min': -1.0, 'max': 5.0, 'fit': 1, 'unc': 0.1},
'DM': {'min': 1.0, 'max': 50.0, 'fit': 0, 'unc': 0.1},
'PEPOCH': {'value': 55000},
'POSEPOCH': {'value': 55000},
'DMEPOCH': {'value': 55000},
'START': {'value': 45000},
'FINISH': {'value': 60000},
'TZRMJD': {'value': 55000},
'TZRFRQ': {'value': 1440},
'TZRSITE': {'value': 3},
'TRES': {'value': 5.8},
'NE_SW': {'value': 4},
'CLK': {'value': 'TT(BIPM)'},
'UNITS': {'value': 'TCB'},
'TIMEEPH': {'value': 'FB90'},
'DILATEFREQ': {'value': 'N'},
'PLANET_SHAPIRO': {'value': 'N'},
'T2CMETHOD': {'value': 'TEMPO'},
'CORRECT_TROPOSPHERE': {'value': 'N'},
'EPHEM': {'value': 'DE436'},
'NITS': {'value': 1},
'NTOA': {'value': 0},
}
def get_psrname_from_elong_elat(elong, elat):
"""ELONG and ELAT should be in degrees"""
ec = ephem.Ecliptic(elong*np.pi/180.0, elat*np.pi/180.0)
eq = ephem.Equatorial(ec, epoch=ephem.J2000)
raj, decj = float(eq.ra), float(eq.dec)
raj_fhr = raj * 12 / np.pi
raj_hr = int(raj_fhr)
raj_min = int((raj_fhr-raj_hr)*60)
decj_fdeg = decj * 180 / np.pi
decj_deg = int(decj_fdeg)
decj_min = int(np.abs((decj_fdeg-decj_deg)*60))
sign = "+" if np.sign(decj) > 0 else "-"
pos_str = f"J{raj_hr:02}{raj_min:02}{sign}{int(np.abs(decj_deg)):02}{decj_min:02}"
return pos_str
def get_psrname(parameters):
if not 'ELONG' in parameters or not 'ELAT' in parameters:
raise ValueError("Positions not set")
elif not 'value' in parameters['ELONG'] or not 'value' in parameters['ELAT']:
raise ValueError("Positions not set")
elong, elat = parameters['ELONG']['value'], parameters['ELAT']['value']
return get_psrname_from_elong_elat(elong, elat)
def get_new_pulsar(parameters):
"""Generate a new pulsar"""
pulsarpars = copy.deepcopy(parameters)
for key, par in pulsarpars.items():
if 'value' not in par:
pgf = par.get('genfunc', gen_parameter_func())
par['value'] = pgf(par)
rv = {'PSRJ': {'value': get_psrname(pulsarpars)}}
rv.update(pulsarpars)
return rv
def generate_parfile(parameters=parameters_outline):
parstring, filename = "", ""
# Generate values
for key, par in get_new_pulsar(parameters).items():
if 'value' in par and 'fit' in par and 'unc' in par:
parstring += f"{key:13} {par['value']:20} {par['fit']} {par['unc']}\n"
elif 'value' in par and 'fit' in par:
parstring += f"{key:13} {par['value']:20} {par['fit']}\n"
elif 'value' in par:
parstring += f"{key:13} {par['value']}\n"
if key == 'PSRJ':
filename = par['value'] + '.par'
return parstring, filename
def read_parfile_values(parfile):
with open(parfile, 'r') as fp:
pfd = {line.split()[0]:line.split()[1] for line in fp.readlines() }
return pfd
def read_timfile_toas(timfile):
toas_mjds = []
ssbfreqs = []
with open(timfile, 'r') as fp:
for line in fp.readlines():
line_cols = line.strip().split()
if len(line_cols) > 2 and not line_cols[0].startswith(('#', 'C')):
toas_mjds.append(float(line_cols[2]))
ssbfreqs.append(float(line_cols[1]))
return np.array(ssbfreqs), np.array(toas_mjds)
def designmatrix_qsd(toas):
ntoas, avetoas = len(toas), (toas-np.mean(toas)) / np.mean(toas)
return np.vstack([avetoas**0, avetoas**1, avetoas**2]).T, ['Offset', 'F0', 'F1']
class MockPulsar(BasePulsar):
"""Class to allow mock pulsars to be used with Enterprise"""
def __init__(self, parfile, obs_times_mjd, ssbfreqs=1440.0, residuals=None, toaerrs=1e-6, sort=True, telescope='GBT'):
self._parfile_values = read_parfile_values(parfile)
if not ("ELONG" and "ELAT" in self._parfile_values):
raise ValueError("Mock pulsar only works with ELONG/ELAT for now")
self._elong = float(self._parfile_values['ELONG'])
self._elat = float(self._parfile_values['ELAT'])
self.name = get_psrname_from_elong_elat(self._elong, self._elat)
self._raj, self._decj = self._get_radec_from_ecliptic(self._elong, self._elat)
self._sort = sort
self.planets = False
self._toas = np.double(obs_times_mjd) * 86400
self._stoas = np.double(obs_times_mjd) * 86400
self._residuals = residuals if residuals else np.zeros_like(obs_times_mjd)
self._toaerrs = np.ones_like(obs_times_mjd) * toaerrs
self._designmatrix, self.fitpars = designmatrix_qsd(self._toas)
self._ssbfreqs = np.ones_like(self._toas) * ssbfreqs / 1e6
self._telescope = telescope
# set parameters
self.setpars = [fp for fp in self.fitpars]
flags = {}
# new-style storage of flags as a numpy record array (previously, psr._flags = flags)
self._flags = np.zeros(len(self._toas), dtype=[(key, val.dtype) for key, val in flags.items()])
for key, val in flags.items():
self._flags[key] = val
self._pdist = self._get_pdist()
self._pos = self._get_pos()
self._planetssb = None
self._sunssb = None
self._pos_t = np.tile(self._pos, (len(self._toas), 1))
self.sort_data()
@AaronDJohnson
Copy link

Hey @vhaasteren, your example usage might need a little tuning. I got it to work with this:

import mockpulsar, os
npsrs = 100
output_dir = './fakepsrs/'
example_tim = 'data/tim/timfile.tim'

psrs = []
for pp in range(npsrs):
    parfile, filename = mockpulsar.generate_parfile(mockpulsar.parameters_outline)

    with open(os.path.join(output_dir, filename), 'w+') as fp:
        fp.write(parfile)

    # Read some example timfile (actual TOAs don't matter)
    ssbfreqs, toas_mjds = mockpulsar.read_timfile_toas(example_tim)

    psrs.append(mockpulsar.MockPulsar(os.path.join(output_dir, filename), toas_mjds, ssbfreqs, toaerrs=1e-7, telescope='GBT'))

parfile is the full parfile contents. Using the outdir + filename instead gives the expected behavior. Very cool! I'm going to use it to do some benchmarking. 😄

@vhaasteren
Copy link
Author

@AaronDJohnson, thanks for pointing that out, I must have been asleep when uploading it. It was actually the wrong version of the code as well, since the generation of ELAT did not include the transformation as it should. Otherwise you get too many pulsars around the poles.

I have updated the code to include that.

And I agree, this is good for benchmarking and stuff, it's super fast in comparison.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment