Last active
April 27, 2023 07:48
-
-
Save vhaasteren/c7d02ba1dd7d48c89e12358243d6fdec to your computer and use it in GitHub Desktop.
Create Enterprise mock pulsars
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
For simulations one often needs to read in par/tim files. But on some machines | |
it is hard to install PINT or tempo2/libstempo. And reading in par/tim files is | |
a bit slow, especially when reading in lots of them. Here we allow use of | |
Enterprise with mock pulsar. | |
Generate some mock par files like so: | |
def example_usage(): | |
import mockpulsar, os | |
npsrs = 100 | |
output_dir = './fakepsrs/' | |
example_tim = 'data/tim/timfile.tim' | |
psrs = [] | |
for pp in range(npsrs): | |
parfile, filename = mockpulsar.generate_parfile(mockpulsar.parameters_outline) | |
with open(os.path.join(output_dir, filename), 'w+') as fp: | |
fp.write(parfile) | |
# Read some example timfile (actual TOAs don't matter) | |
ssbfreqs, toas_mjds = mockpulsar.read_timfile_toas(example_tim) | |
psrs.append(mockpulsar.MockPulsar(os.path.join(output_dir, filename), toas_mjds, ssbfreqs, toaerrs=1e-7, telescope='GBT')) | |
""" | |
import copy | |
import numpy as np | |
from enterprise.pulsar import BasePulsar | |
import ephem | |
def gen_parameter_func(forward=lambda x: x, backward=lambda x: x): | |
def gen_parameter(pd): | |
trans_min = forward(pd['min']) | |
trans_max = forward(pd['max']) | |
trans_par = trans_min + np.random.rand() * (trans_max - trans_min) | |
return backward(trans_par) | |
return gen_parameter | |
parameters_outline = { | |
"ELONG": {'min': 0.0, 'max': 360.0, 'fit': 1, 'unc': 0.1}, | |
"ELAT": {'min': -90.0, 'max': 90.0, 'fit': 1, 'unc': 0.1, | |
'dist': gen_parameter_func( | |
forward=lambda x: np.sin(np.radians(x)), | |
backward=lambda x: np.degrees(np.arcsin(x))) | |
}, | |
"PMELONG": {'min': -20.0, 'max': 20.0, 'fit': 1, 'unc': 0.1}, | |
'PMELAT': {'min': -20.0, 'max': 20.0, 'fit': 1, 'unc': 0.1}, | |
'F0': {'min': 100.0, 'max': 900.0, 'fit': 1, 'unc': 1e-5}, | |
'F1': {'min': 1.0e-16, 'max': 1.0e-15, 'fit': 1, 'unc': 1e-18}, | |
'PX': {'min': -1.0, 'max': 5.0, 'fit': 1, 'unc': 0.1}, | |
'DM': {'min': 1.0, 'max': 50.0, 'fit': 0, 'unc': 0.1}, | |
'PEPOCH': {'value': 55000}, | |
'POSEPOCH': {'value': 55000}, | |
'DMEPOCH': {'value': 55000}, | |
'START': {'value': 45000}, | |
'FINISH': {'value': 60000}, | |
'TZRMJD': {'value': 55000}, | |
'TZRFRQ': {'value': 1440}, | |
'TZRSITE': {'value': 3}, | |
'TRES': {'value': 5.8}, | |
'NE_SW': {'value': 4}, | |
'CLK': {'value': 'TT(BIPM)'}, | |
'UNITS': {'value': 'TCB'}, | |
'TIMEEPH': {'value': 'FB90'}, | |
'DILATEFREQ': {'value': 'N'}, | |
'PLANET_SHAPIRO': {'value': 'N'}, | |
'T2CMETHOD': {'value': 'TEMPO'}, | |
'CORRECT_TROPOSPHERE': {'value': 'N'}, | |
'EPHEM': {'value': 'DE436'}, | |
'NITS': {'value': 1}, | |
'NTOA': {'value': 0}, | |
} | |
def get_psrname_from_elong_elat(elong, elat): | |
"""ELONG and ELAT should be in degrees""" | |
ec = ephem.Ecliptic(elong*np.pi/180.0, elat*np.pi/180.0) | |
eq = ephem.Equatorial(ec, epoch=ephem.J2000) | |
raj, decj = float(eq.ra), float(eq.dec) | |
raj_fhr = raj * 12 / np.pi | |
raj_hr = int(raj_fhr) | |
raj_min = int((raj_fhr-raj_hr)*60) | |
decj_fdeg = decj * 180 / np.pi | |
decj_deg = int(decj_fdeg) | |
decj_min = int(np.abs((decj_fdeg-decj_deg)*60)) | |
sign = "+" if np.sign(decj) > 0 else "-" | |
pos_str = f"J{raj_hr:02}{raj_min:02}{sign}{int(np.abs(decj_deg)):02}{decj_min:02}" | |
return pos_str | |
def get_psrname(parameters): | |
if not 'ELONG' in parameters or not 'ELAT' in parameters: | |
raise ValueError("Positions not set") | |
elif not 'value' in parameters['ELONG'] or not 'value' in parameters['ELAT']: | |
raise ValueError("Positions not set") | |
elong, elat = parameters['ELONG']['value'], parameters['ELAT']['value'] | |
return get_psrname_from_elong_elat(elong, elat) | |
def get_new_pulsar(parameters): | |
"""Generate a new pulsar""" | |
pulsarpars = copy.deepcopy(parameters) | |
for key, par in pulsarpars.items(): | |
if 'value' not in par: | |
pgf = par.get('genfunc', gen_parameter_func()) | |
par['value'] = pgf(par) | |
rv = {'PSRJ': {'value': get_psrname(pulsarpars)}} | |
rv.update(pulsarpars) | |
return rv | |
def generate_parfile(parameters=parameters_outline): | |
parstring, filename = "", "" | |
# Generate values | |
for key, par in get_new_pulsar(parameters).items(): | |
if 'value' in par and 'fit' in par and 'unc' in par: | |
parstring += f"{key:13} {par['value']:20} {par['fit']} {par['unc']}\n" | |
elif 'value' in par and 'fit' in par: | |
parstring += f"{key:13} {par['value']:20} {par['fit']}\n" | |
elif 'value' in par: | |
parstring += f"{key:13} {par['value']}\n" | |
if key == 'PSRJ': | |
filename = par['value'] + '.par' | |
return parstring, filename | |
def read_parfile_values(parfile): | |
with open(parfile, 'r') as fp: | |
pfd = {line.split()[0]:line.split()[1] for line in fp.readlines() } | |
return pfd | |
def read_timfile_toas(timfile): | |
toas_mjds = [] | |
ssbfreqs = [] | |
with open(timfile, 'r') as fp: | |
for line in fp.readlines(): | |
line_cols = line.strip().split() | |
if len(line_cols) > 2 and not line_cols[0].startswith(('#', 'C')): | |
toas_mjds.append(float(line_cols[2])) | |
ssbfreqs.append(float(line_cols[1])) | |
return np.array(ssbfreqs), np.array(toas_mjds) | |
def designmatrix_qsd(toas): | |
ntoas, avetoas = len(toas), (toas-np.mean(toas)) / np.mean(toas) | |
return np.vstack([avetoas**0, avetoas**1, avetoas**2]).T, ['Offset', 'F0', 'F1'] | |
class MockPulsar(BasePulsar): | |
"""Class to allow mock pulsars to be used with Enterprise""" | |
def __init__(self, parfile, obs_times_mjd, ssbfreqs=1440.0, residuals=None, toaerrs=1e-6, sort=True, telescope='GBT'): | |
self._parfile_values = read_parfile_values(parfile) | |
if not ("ELONG" and "ELAT" in self._parfile_values): | |
raise ValueError("Mock pulsar only works with ELONG/ELAT for now") | |
self._elong = float(self._parfile_values['ELONG']) | |
self._elat = float(self._parfile_values['ELAT']) | |
self.name = get_psrname_from_elong_elat(self._elong, self._elat) | |
self._raj, self._decj = self._get_radec_from_ecliptic(self._elong, self._elat) | |
self._sort = sort | |
self.planets = False | |
self._toas = np.double(obs_times_mjd) * 86400 | |
self._stoas = np.double(obs_times_mjd) * 86400 | |
self._residuals = residuals if residuals else np.zeros_like(obs_times_mjd) | |
self._toaerrs = np.ones_like(obs_times_mjd) * toaerrs | |
self._designmatrix, self.fitpars = designmatrix_qsd(self._toas) | |
self._ssbfreqs = np.ones_like(self._toas) * ssbfreqs / 1e6 | |
self._telescope = telescope | |
# set parameters | |
self.setpars = [fp for fp in self.fitpars] | |
flags = {} | |
# new-style storage of flags as a numpy record array (previously, psr._flags = flags) | |
self._flags = np.zeros(len(self._toas), dtype=[(key, val.dtype) for key, val in flags.items()]) | |
for key, val in flags.items(): | |
self._flags[key] = val | |
self._pdist = self._get_pdist() | |
self._pos = self._get_pos() | |
self._planetssb = None | |
self._sunssb = None | |
self._pos_t = np.tile(self._pos, (len(self._toas), 1)) | |
self.sort_data() |
@AaronDJohnson, thanks for pointing that out, I must have been asleep when uploading it. It was actually the wrong version of the code as well, since the generation of ELAT did not include the transformation as it should. Otherwise you get too many pulsars around the poles.
I have updated the code to include that.
And I agree, this is good for benchmarking and stuff, it's super fast in comparison.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey @vhaasteren, your example usage might need a little tuning. I got it to work with this:
parfile
is the full parfile contents. Using theoutdir + filename
instead gives the expected behavior. Very cool! I'm going to use it to do some benchmarking. 😄